Skip to content

Commit

Permalink
Revert "Enable require_alias for Bulk requests for all actions when t…
Browse files Browse the repository at this point in the history
…arget is a write alias (elastic#29879)"

This reverts commit 84bf434.
  • Loading branch information
kvch committed Jan 27, 2022
1 parent ca2bd68 commit 60a757e
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 77 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Affecting all Beats*

- Enable `require_alias` for Bulk requests for all actions when target is a write alias. {issue}27874[27874] {pull}29879[29879]


*Auditbeat*

Expand Down
9 changes: 4 additions & 5 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ type BulkDeleteAction struct {
}

type BulkMeta struct {
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"`
ID string `json:"_id,omitempty" struct:"_id,omitempty"`
RequireAlias bool `json:"require_alias,omitempty" struct:"require_alias,omitempty"`
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"`
ID string `json:"_id,omitempty" struct:"_id,omitempty"`
}

type bulkRequest struct {
Expand Down
4 changes: 0 additions & 4 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,13 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) {
return idx, err
}

func (s ilmIndexSelector) IsAlias() bool { return true }

func (s indexSelector) Select(evt *beat.Event) (string, error) {
if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" {
return idx, nil
}
return s.sel.Select(evt)
}

func (s indexSelector) IsAlias() bool { return false }

func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
if len(evt.Meta) == 0 {
return ""
Expand Down
8 changes: 0 additions & 8 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,6 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev
ID: id,
}

if isRequireAliasSupported(version) {
meta.RequireAlias = client.index.IsAlias()
}

if opType == events.OpTypeDelete {
if id != "" {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
Expand All @@ -337,10 +333,6 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev
return eslegclient.BulkIndexAction{Index: meta}, nil
}

func isRequireAliasSupported(version common.Version) bool {
return !version.LessThan(common.MustNewVersion("7.10.0"))
}

func (client *Client) getPipeline(event *beat.Event) (string, error) {
if event.Meta != nil {
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)
Expand Down
7 changes: 1 addition & 6 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu
}

info := beat.Info{Beat: "libbeat"}
// ILM must be disabled otherwise custom index settings are ignored.
im, _ := idxmgmt.DefaultSupport(nil, info, disabledILMConfig())
im, _ := idxmgmt.DefaultSupport(nil, info, nil)
output, err := makeES(im, info, stats, config)
if err != nil {
t.Fatal(err)
Expand All @@ -439,10 +438,6 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu
return client, client
}

func disabledILMConfig() *common.Config {
return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}})
}

// setupRoleMapping sets up role mapping for the Kerberos user beats@ELASTIC
func setupRoleMapping(t *testing.T, host string) error {
_, client := connectTestEsWithoutStats(t, map[string]interface{}{
Expand Down
60 changes: 14 additions & 46 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,49 +409,22 @@ func TestClientWithHeaders(t *testing.T) {

func TestBulkEncodeEvents(t *testing.T) {
cases := map[string]struct {
version string
docType string
config common.MapStr
ilmConfig *common.Config
isAlias bool
events []common.MapStr
version string
docType string
config common.MapStr
events []common.MapStr
}{
"6.x": {
version: "6.8.0",
docType: "doc",
config: common.MapStr{},
ilmConfig: common.NewConfig(),
events: []common.MapStr{{"message": "test"}},
version: "6.8.0",
docType: "doc",
config: common.MapStr{},
events: []common.MapStr{{"message": "test"}},
},
"require_alias not supported": {
version: "7.9.0",
docType: "",
config: common.MapStr{},
ilmConfig: common.NewConfig(),
events: []common.MapStr{{"message": "test"}},
},
"require_alias is supported": {
version: "7.10.0",
docType: "",
config: common.MapStr{},
ilmConfig: common.NewConfig(),
isAlias: true,
events: []common.MapStr{{"message": "test"}},
},
"latest with ILM": {
version: version.GetDefaultVersion(),
docType: "",
config: common.MapStr{},
ilmConfig: common.NewConfig(),
isAlias: true,
events: []common.MapStr{{"message": "test"}},
},
"latest without ILM": {
version: version.GetDefaultVersion(),
docType: "",
config: common.MapStr{},
ilmConfig: disabledILMConfig(),
events: []common.MapStr{{"message": "test"}},
"latest": {
version: version.GetDefaultVersion(),
docType: "",
config: common.MapStr{},
events: []common.MapStr{{"message": "test"}},
},
}

Expand All @@ -464,7 +437,7 @@ func TestBulkEncodeEvents(t *testing.T) {
Version: test.version,
}

im, err := idxmgmt.DefaultSupport(nil, info, test.ilmConfig)
im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig())
require.NoError(t, err)

index, pipeline, err := buildSelectors(im, info, cfg)
Expand Down Expand Up @@ -506,7 +479,6 @@ func TestBulkEncodeEvents(t *testing.T) {
}

assert.NotEqual(t, "", meta.Index)
assert.Equal(t, test.isAlias, meta.RequireAlias)
assert.Equal(t, test.docType, meta.DocType)
}

Expand All @@ -515,10 +487,6 @@ func TestBulkEncodeEvents(t *testing.T) {
}
}

func disabledILMConfig() *common.Config {
return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}})
}

func TestBulkEncodeEventsWithOpType(t *testing.T) {
cases := []common.MapStr{
{"_id": "111", "op_type": e.OpTypeIndex, "message": "test 1", "bulkIndex": 0},
Expand Down
2 changes: 0 additions & 2 deletions libbeat/outputs/elasticsearch/death_letter_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,3 @@ func (d DeadLetterSelector) Select(event *beat.Event) (string, error) {
}
return d.Selector.Select(event)
}

func (d DeadLetterSelector) IsAlias() bool { return false }
2 changes: 0 additions & 2 deletions libbeat/outputs/outil/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (s Selector) Select(evt *beat.Event) (string, error) {
return s.sel.sel(evt)
}

func (s Selector) IsAlias() bool { return false }

// IsEmpty checks if the selector is not configured and will always return an empty string.
func (s Selector) IsEmpty() bool {
return s.sel == nilSelector || s.sel == nil
Expand Down
2 changes: 0 additions & 2 deletions libbeat/outputs/output_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ type IndexManager interface {
}

// IndexSelector is used to find the index name an event shall be indexed to.
// It also used to check if during indexing required_alias should be set.
type IndexSelector interface {
Select(event *beat.Event) (string, error)
IsAlias() bool
}

// Group configures and combines multiple clients into load-balanced group of clients
Expand Down

0 comments on commit 60a757e

Please sign in to comment.