diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 6704596f317..de7da05de5b 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -35,5 +35,5 @@ The list below covers the major changes between 7.0.0-rc2 and master only. by `make` and `mage`. Example: `export PYTHON_EXE=python2.7`. {pull}11212[11212] - Prometheus helper for metricbeat contains now `Namespace` field for `prometheus.MetricsMappings` {pull}11424[11424] - Update Jinja2 version to 2.10.1. {pull}11817[11817] -- Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}11777[11777] +- Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}11777[11777], {pull}12065[12065] - Update urllib3 version to 1.24.2 {pull}11930[11930] diff --git a/libbeat/cmd/export/ilm_policy.go b/libbeat/cmd/export/ilm_policy.go index 83981dd3dc7..57a3c32103a 100644 --- a/libbeat/cmd/export/ilm_policy.go +++ b/libbeat/cmd/export/ilm_policy.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/libbeat/cmd/instance" "github.com/elastic/beats/libbeat/idxmgmt" + "github.com/elastic/beats/libbeat/idxmgmt/ilm" ) // GenGetILMPolicyCmd is the command used to export the ilm policy. @@ -33,6 +34,9 @@ func GenGetILMPolicyCmd(settings instance.Settings) *cobra.Command { version, _ := cmd.Flags().GetString("es.version") dir, _ := cmd.Flags().GetString("dir") + if settings.ILM == nil { + settings.ILM = ilm.StdSupport + } b, err := instance.NewInitializedBeat(settings) if err != nil { fatalfInitCmd(err) @@ -40,7 +44,7 @@ func GenGetILMPolicyCmd(settings instance.Settings) *cobra.Command { clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version)) idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields)) - if err := idxManager.Setup(idxmgmt.LoadModeDisabled, idxmgmt.LoadModeEnabled); err != nil { + if err := idxManager.Setup(idxmgmt.LoadModeDisabled, idxmgmt.LoadModeForce); err != nil { fatalf("Error exporting ilm-policy: %+v.", err) } }, diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go index 5b74ff57ac5..6dd145ec408 100644 --- a/libbeat/cmd/export/template.go +++ b/libbeat/cmd/export/template.go @@ -20,12 +20,9 @@ package export import ( "github.com/spf13/cobra" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cmd/instance" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/idxmgmt/ilm" - "github.com/elastic/beats/libbeat/logp" ) // GenTemplateConfigCmd is the command used to export the elasticsearch template. @@ -39,7 +36,10 @@ func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command { noILM, _ := cmd.Flags().GetBool("noilm") if noILM { - settings.ILM = ilmNoopSupport + settings.ILM = ilm.NoopSupport + } + if settings.ILM == nil { + settings.ILM = ilm.StdSupport } b, err := instance.NewInitializedBeat(settings) @@ -49,7 +49,7 @@ func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command { clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version)) idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields)) - if err := idxManager.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeDisabled); err != nil { + if err := idxManager.Setup(idxmgmt.LoadModeForce, idxmgmt.LoadModeDisabled); err != nil { fatalf("Error exporting template: %+v.", err) } }, @@ -61,7 +61,3 @@ func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command { return genTemplateConfigCmd } - -func ilmNoopSupport(_ *logp.Logger, info beat.Info, config *common.Config) (ilm.Supporter, error) { - return ilm.NoopSupport(info, config) -} diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index e8ac6270d64..78ea2ab62fa 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -490,10 +490,10 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields)) var tmplLoadMode, ilmLoadMode = idxmgmt.LoadModeUnset, idxmgmt.LoadModeUnset if setup.Template { - tmplLoadMode = idxmgmt.LoadModeForce + tmplLoadMode = idxmgmt.LoadModeOverwrite } if setup.ILMPolicy { - ilmLoadMode = idxmgmt.LoadModeForce + ilmLoadMode = idxmgmt.LoadModeOverwrite } err = m.Setup(tmplLoadMode, ilmLoadMode) diff --git a/libbeat/idxmgmt/idxmgmt.go b/libbeat/idxmgmt/idxmgmt.go index 9181ae8c818..6ec75b033ef 100644 --- a/libbeat/idxmgmt/idxmgmt.go +++ b/libbeat/idxmgmt/idxmgmt.go @@ -74,12 +74,14 @@ const ( // LoadModeUnset indicates that no specific mode is set. // Instead the decision about loading data will be derived from the config or their respective default values. LoadModeUnset LoadMode = iota //unset + // LoadModeDisabled indicates no loading + LoadModeDisabled //disabled // LoadModeEnabled indicates loading if not already available LoadModeEnabled //enabled - // LoadModeForce indicates loading in any case. + // LoadModeOverwrite indicates overwriting existing components, if loading is not generally disabled. + LoadModeOverwrite //overwrite + // LoadModeForce indicates forcing to load components in any case, independent of general loading configurations. LoadModeForce //force - // LoadModeDisabled indicates no loading - LoadModeDisabled //disabled ) // Enabled returns whether or not the LoadMode should be considered enabled diff --git a/libbeat/idxmgmt/idxmgmt_test.go b/libbeat/idxmgmt/idxmgmt_test.go index c5a72ca05bb..01705dfa7a1 100644 --- a/libbeat/idxmgmt/idxmgmt_test.go +++ b/libbeat/idxmgmt/idxmgmt_test.go @@ -268,6 +268,24 @@ func TestDefaultSupport_TemplateHandling(t *testing.T) { loadTemplate: LoadModeEnabled, tmplCfg: &defaultCfg, }, + "template default loadMode Overwrite, ilm disabled": { + cfg: common.MapStr{ + "setup.ilm.enabled": false, + }, + loadTemplate: LoadModeOverwrite, + tmplCfg: cfgWith(template.DefaultConfig(), map[string]interface{}{ + "overwrite": "true", + }), + }, + "template default loadMode Force, ilm disabled": { + cfg: common.MapStr{ + "setup.ilm.enabled": false, + }, + loadTemplate: LoadModeForce, + tmplCfg: cfgWith(template.DefaultConfig(), map[string]interface{}{ + "overwrite": "true", + }), + }, "template loadMode disabled, ilm disabled": { cfg: common.MapStr{ "setup.ilm.enabled": false, @@ -281,6 +299,22 @@ func TestDefaultSupport_TemplateHandling(t *testing.T) { alias: "test-9.9.9", policy: "test-9.9.9", }, + "template disabled, ilm disabled, loadMode Overwrite": { + cfg: common.MapStr{ + "setup.template.enabled": false, + "setup.ilm.enabled": false, + }, + loadILM: LoadModeOverwrite, + }, + "template disabled, ilm disabled, loadMode Force": { + cfg: common.MapStr{ + "setup.template.enabled": false, + "setup.ilm.enabled": false, + }, + loadILM: LoadModeForce, + alias: "test-9.9.9", + policy: "test-9.9.9", + }, "template loadmode disabled, ilm loadMode enabled": { loadTemplate: LoadModeDisabled, loadILM: LoadModeEnabled, @@ -304,7 +338,7 @@ func TestDefaultSupport_TemplateHandling(t *testing.T) { for name, test := range cases { t.Run(name, func(t *testing.T) { info := beat.Info{Beat: "test", Version: "9.9.9"} - factory := MakeDefaultSupport(nil) + factory := MakeDefaultSupport(ilm.StdSupport) im, err := factory(nil, info, common.MustNewConfigFrom(test.cfg)) require.NoError(t, err) diff --git a/libbeat/idxmgmt/ilm/ilm.go b/libbeat/idxmgmt/ilm/ilm.go index d7e803b5742..3ecc85d8c40 100644 --- a/libbeat/idxmgmt/ilm/ilm.go +++ b/libbeat/idxmgmt/ilm/ilm.go @@ -70,6 +70,22 @@ type Alias struct { // DefaultSupport configures a new default ILM support implementation. func DefaultSupport(log *logp.Logger, info beat.Info, config *common.Config) (Supporter, error) { + cfg := defaultConfig(info) + if config != nil { + if err := config.Unpack(&cfg); err != nil { + return nil, err + } + } + + if cfg.Mode == ModeDisabled { + return NewNoopSupport(info, config) + } + + return StdSupport(log, info, config) +} + +// StdSupport configures a new std ILM support implementation. +func StdSupport(log *logp.Logger, info beat.Info, config *common.Config) (Supporter, error) { if log == nil { log = logp.NewLogger("ilm") } else { @@ -83,10 +99,6 @@ func DefaultSupport(log *logp.Logger, info beat.Info, config *common.Config) (Su } } - if cfg.Mode == ModeDisabled { - return NoopSupport(info, config) - } - name, err := applyStaticFmtstr(info, &cfg.PolicyName) if err != nil { return nil, errors.Wrap(err, "failed to read ilm policy name") @@ -115,8 +127,13 @@ func DefaultSupport(log *logp.Logger, info beat.Info, config *common.Config) (Su policy.Body = body } - log.Infof("Policy name: %v", name) - return NewDefaultSupport(log, cfg.Mode, alias, policy, cfg.Overwrite, cfg.CheckExists), nil + return NewStdSupport(log, cfg.Mode, alias, policy, cfg.Overwrite, cfg.CheckExists), nil +} + +// NoopSupport configures a new noop ILM support implementation, +// should be used when ILM is disabled +func NoopSupport(_ *logp.Logger, info beat.Info, config *common.Config) (Supporter, error) { + return NewNoopSupport(info, config) } func applyStaticFmtstr(info beat.Info, fmt *fmtstr.EventFormatString) (string, error) { diff --git a/libbeat/idxmgmt/ilm/ilm_test.go b/libbeat/idxmgmt/ilm/ilm_test.go index 0ca72c86673..9a860e04f64 100644 --- a/libbeat/idxmgmt/ilm/ilm_test.go +++ b/libbeat/idxmgmt/ilm/ilm_test.go @@ -65,7 +65,7 @@ func TestDefaultSupport_Init(t *testing.T) { )) require.NoError(t, err) - s := tmp.(*ilmSupport) + s := tmp.(*stdSupport) assert := assert.New(t) assert.Equal(true, s.overwrite) assert.Equal(false, s.checkExists) diff --git a/libbeat/idxmgmt/ilm/noop.go b/libbeat/idxmgmt/ilm/noop.go index 779bd16c5ea..eda57f24622 100644 --- a/libbeat/idxmgmt/ilm/noop.go +++ b/libbeat/idxmgmt/ilm/noop.go @@ -25,9 +25,9 @@ import ( type noopSupport struct{} type noopManager struct{} -// NoopSupport creates a noop ILM implementation with ILM support being always +// NewNoopSupport creates a noop ILM implementation with ILM support being always // disabled. Attempts to install a policy or create a write alias will fail. -func NoopSupport(info beat.Info, config *common.Config) (Supporter, error) { +func NewNoopSupport(info beat.Info, config *common.Config) (Supporter, error) { return (*noopSupport)(nil), nil } diff --git a/libbeat/idxmgmt/ilm/std.go b/libbeat/idxmgmt/ilm/std.go index c9622eb1847..fb057cbaac3 100644 --- a/libbeat/idxmgmt/ilm/std.go +++ b/libbeat/idxmgmt/ilm/std.go @@ -23,7 +23,7 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -type ilmSupport struct { +type stdSupport struct { log *logp.Logger mode Mode @@ -34,8 +34,8 @@ type ilmSupport struct { policy Policy } -type singlePolicyManager struct { - *ilmSupport +type stdManager struct { + *stdSupport client ClientHandler // cached info @@ -49,15 +49,15 @@ type infoCache struct { var defaultCacheDuration = 5 * time.Minute -// NewDefaultSupport creates an instance of default ILM support implementation. -func NewDefaultSupport( +// NewStdSupport creates an instance of default ILM support implementation. +func NewStdSupport( log *logp.Logger, mode Mode, alias Alias, policy Policy, overwrite, checkExists bool, ) Supporter { - return &ilmSupport{ + return &stdSupport{ log: log, mode: mode, overwrite: overwrite, @@ -67,18 +67,18 @@ func NewDefaultSupport( } } -func (s *ilmSupport) Mode() Mode { return s.mode } -func (s *ilmSupport) Alias() Alias { return s.alias } -func (s *ilmSupport) Policy() Policy { return s.policy } +func (s *stdSupport) Mode() Mode { return s.mode } +func (s *stdSupport) Alias() Alias { return s.alias } +func (s *stdSupport) Policy() Policy { return s.policy } -func (s *ilmSupport) Manager(h ClientHandler) Manager { - return &singlePolicyManager{ +func (s *stdSupport) Manager(h ClientHandler) Manager { + return &stdManager{ client: h, - ilmSupport: s, + stdSupport: s, } } -func (m *singlePolicyManager) Enabled() (bool, error) { +func (m *stdManager) Enabled() (bool, error) { if m.mode == ModeDisabled { return false, nil } @@ -101,7 +101,7 @@ func (m *singlePolicyManager) Enabled() (bool, error) { return enabled, nil } -func (m *singlePolicyManager) EnsureAlias() error { +func (m *stdManager) EnsureAlias() error { b, err := m.client.HasAlias(m.alias.Name) if err != nil { return err @@ -114,7 +114,7 @@ func (m *singlePolicyManager) EnsureAlias() error { return m.client.CreateAlias(m.alias) } -func (m *singlePolicyManager) EnsurePolicy(overwrite bool) (bool, error) { +func (m *stdManager) EnsurePolicy(overwrite bool) (bool, error) { log := m.log overwrite = overwrite || m.overwrite diff --git a/libbeat/idxmgmt/loadmode_string.go b/libbeat/idxmgmt/loadmode_string.go index 0cb74b2242d..967b94abc02 100644 --- a/libbeat/idxmgmt/loadmode_string.go +++ b/libbeat/idxmgmt/loadmode_string.go @@ -26,14 +26,15 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[LoadModeUnset-0] - _ = x[LoadModeEnabled-1] - _ = x[LoadModeForce-2] - _ = x[LoadModeDisabled-3] + _ = x[LoadModeDisabled-1] + _ = x[LoadModeEnabled-2] + _ = x[LoadModeOverwrite-3] + _ = x[LoadModeForce-4] } -const _LoadMode_name = "unsetenabledforcedisabled" +const _LoadMode_name = "unsetdisabledenabledoverwriteforce" -var _LoadMode_index = [...]uint8{0, 5, 12, 17, 25} +var _LoadMode_index = [...]uint8{0, 5, 13, 20, 29, 34} func (i LoadMode) String() string { if i >= LoadMode(len(_LoadMode_index)-1) { diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index 2159f1c87e0..ea655bb71a8 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -197,12 +197,12 @@ func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error { } } - if withILM && loadILM.Enabled() { + if loadILM == LoadModeForce || withILM && loadILM.Enabled() { // mark ILM as enabled in indexState if withILM is true m.support.st.withILM.CAS(false, true) // install ilm policy - policyCreated, err := m.ilm.EnsurePolicy(loadILM == LoadModeForce) + policyCreated, err := m.ilm.EnsurePolicy(loadILM >= LoadModeOverwrite) if err != nil { return err } @@ -210,7 +210,7 @@ func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error { // The template should be updated if a new policy is created. if policyCreated && loadTemplate.Enabled() { - loadTemplate = LoadModeForce + loadTemplate = LoadModeOverwrite } // create alias @@ -225,7 +225,7 @@ func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error { } // create and install template - if m.support.templateCfg.Enabled && loadTemplate.Enabled() { + if loadTemplate == LoadModeForce || m.support.templateCfg.Enabled && loadTemplate.Enabled() { tmplCfg := m.support.templateCfg if withILM { @@ -237,11 +237,12 @@ func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error { } if loadTemplate == LoadModeForce { + tmplCfg.Enabled = true + } + if loadTemplate >= LoadModeOverwrite { tmplCfg.Overwrite = true } - fields := m.assets.Fields(m.support.info.Beat) - err = m.clientHandler.Load(tmplCfg, m.support.info, fields, m.support.migration) if err != nil { return fmt.Errorf("error loading template: %v", err) diff --git a/libbeat/tests/system/base.py b/libbeat/tests/system/base.py index 65d96ae9462..7fd9f1dbb49 100644 --- a/libbeat/tests/system/base.py +++ b/libbeat/tests/system/base.py @@ -1,5 +1,6 @@ import os from beat.beat import TestCase +from elasticsearch import Elasticsearch, NotFoundError class BaseTest(TestCase): @@ -16,4 +17,12 @@ def setUpClass(self): "packetbeat", "winlogbeat" ] + self._es = None super(BaseTest, self).setUpClass() + + def es_client(self): + if self._es: + return self._es + + self._es = Elasticsearch([self.get_elasticsearch_url()]) + return self._es diff --git a/libbeat/tests/system/config/libbeat.yml.j2 b/libbeat/tests/system/config/libbeat.yml.j2 index 5ac42df8446..1a61a2258b1 100644 --- a/libbeat/tests/system/config/libbeat.yml.j2 +++ b/libbeat/tests/system/config/libbeat.yml.j2 @@ -25,7 +25,9 @@ setup.template.pattern: "{{setup_template_pattern}}" {% if ilm %} setup.ilm: enabled: {{ ilm.enabled | default("auto") }} - policy_name: libbeat-test-default-policy + {% if ilm.policy_name %} + policy_name: {{ ilm.policy_name }} + {% endif %} {% if ilm.pattern %} pattern: {{ ilm.pattern }} {% endif %} diff --git a/libbeat/tests/system/config/mockbeat.yml.j2 b/libbeat/tests/system/config/mockbeat.yml.j2 index 55f2ec97d5f..df185c8ca58 100644 --- a/libbeat/tests/system/config/mockbeat.yml.j2 +++ b/libbeat/tests/system/config/mockbeat.yml.j2 @@ -82,7 +82,9 @@ setup.template: {% if ilm %} setup.ilm: enabled: {{ ilm.enabled | default("auto") }} - policy_name: libbeat-test-default-policy + {% if ilm.policy_name %} + policy_name: {{ ilm.policy_name }} + {% endif %} {% if ilm.pattern %} pattern: {{ ilm.pattern }} {% endif %} diff --git a/libbeat/tests/system/idxmgmt.py b/libbeat/tests/system/idxmgmt.py new file mode 100644 index 00000000000..caaa7d7d974 --- /dev/null +++ b/libbeat/tests/system/idxmgmt.py @@ -0,0 +1,112 @@ +from elasticsearch import NotFoundError +from nose.tools import raises +import datetime + + +class IdxMgmt(object): + + def __init__(self, client, index): + self._client = client + if index == "": + index == "mockbeat" + self._index = index + + def needs_init(self, s): + return s == '' or s == '*' + + def delete(self, index="", template="", policy=""): + self.delete_index_and_alias(index=index) + self.delete_template(template=template) + self.delete_policy(policy=policy) + + def delete_index_and_alias(self, index=""): + if self.needs_init(index): + index = self._index + + try: + self._client.transport.perform_request('DELETE', "/" + index + "*") + except NotFoundError: + pass + + def delete_template(self, template=""): + if self.needs_init(template): + template = self._index + + try: + self._client.transport.perform_request('DELETE', "/_template/" + template + "*") + except NotFoundError: + pass + + def delete_policy(self, policy=""): + if self.needs_init(policy): + policy = self._index + + # Delete any existing policy starting with given policy + policies = self._client.transport.perform_request('GET', "/_ilm/policy") + for p, _ in policies.items(): + if not p.startswith(policy): + continue + try: + self._client.transport.perform_request('DELETE', "/_ilm/policy/" + p) + except NotFoundError: + pass + + @raises(NotFoundError) + def assert_index_template_not_loaded(self, template): + self._client.transport.perform_request('GET', '/_template/' + template) + + def assert_index_template_loaded(self, template): + resp = self._client.transport.perform_request('GET', '/_template/' + template) + assert template in resp + assert "lifecycle" not in resp[template]["settings"]["index"] + + def assert_ilm_template_loaded(self, template, policy, alias): + resp = self._client.transport.perform_request('GET', '/_template/' + template) + assert resp[template]["settings"]["index"]["lifecycle"]["name"] == policy + assert resp[template]["settings"]["index"]["lifecycle"]["rollover_alias"] == alias + + def assert_index_template_index_pattern(self, template, index_pattern): + resp = self._client.transport.perform_request('GET', '/_template/' + template) + assert template in resp + assert resp[template]["index_patterns"] == index_pattern + + def assert_alias_not_created(self, alias): + resp = self._client.transport.perform_request('GET', '/_alias') + for name, entry in resp.items(): + if alias not in name: + continue + assert entry["aliases"] == {}, entry["aliases"] + + def assert_alias_created(self, alias, pattern=None): + if pattern is None: + pattern = self.default_pattern() + name = alias + "-" + pattern + resp = self._client.transport.perform_request('GET', '/_alias/' + alias) + assert name in resp + assert resp[name]["aliases"][alias]["is_write_index"] == True + + @raises(NotFoundError) + def assert_policy_not_created(self, policy): + self._client.transport.perform_request('GET', '/_ilm/policy/' + policy) + + def assert_policy_created(self, policy): + resp = self._client.transport.perform_request('GET', '/_ilm/policy/' + policy) + assert policy in resp + assert resp[policy]["policy"]["phases"]["hot"]["actions"]["rollover"]["max_size"] == "50gb" + assert resp[policy]["policy"]["phases"]["hot"]["actions"]["rollover"]["max_age"] == "30d" + + def assert_docs_written_to_alias(self, alias, pattern=None): + if pattern is None: + pattern = self.default_pattern() + name = alias + "-" + pattern + data = self._client.transport.perform_request('GET', '/' + name + '/_search') + assert data["hits"]["total"] > 0 + + def default_pattern(self): + d = datetime.datetime.now().strftime("%Y.%m.%d") + return d + "-000001" + + def index_for(self, alias, pattern=None): + if pattern is None: + pattern = self.default_pattern() + return "{}-{}".format(alias, pattern) diff --git a/libbeat/tests/system/test_cmd.py b/libbeat/tests/system/test_cmd.py deleted file mode 100644 index e29af7301db..00000000000 --- a/libbeat/tests/system/test_cmd.py +++ /dev/null @@ -1,196 +0,0 @@ -from base import BaseTest -from nose.plugins.attrib import attr -from elasticsearch import Elasticsearch, TransportError - -import logging -import os -import shutil -import unittest - - -INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) - - -class TestCommands(BaseTest): - """ - Test beat subcommands - """ - - def setUp(self): - super(BaseTest, self).setUp() - - self.elasticsearch_url = self.get_elasticsearch_url() - print("Using elasticsearch: {}".format(self.elasticsearch_url)) - self.es = Elasticsearch([self.elasticsearch_url]) - logging.getLogger("urllib3").setLevel(logging.WARNING) - logging.getLogger("elasticsearch").setLevel(logging.ERROR) - - def test_version(self): - """ - Test version command - """ - exit_code = self.run_beat( - extra_args=["version"], logging_args=["-v", "-d", "*"]) - assert exit_code == 0 - - assert self.log_contains("mockbeat") is True - assert self.log_contains("version") is True - assert self.log_contains("9.9.9") is True - - @unittest.skipUnless(INTEGRATION_TESTS, "integration test") - @attr('integration') - def test_setup_template(self): - """ - Test setup -template command - """ - # Delete any existing template - try: - self.es.indices.delete_template('mockbeat-*') - except: - pass - - assert len(self.es.cat.templates(name='mockbeat-*', h='name')) == 0 - - shutil.copy(self.beat_path + "/_meta/config.yml", - os.path.join(self.working_dir, "libbeat.yml")) - shutil.copy(self.beat_path + "/fields.yml", - os.path.join(self.working_dir, "fields.yml")) - - exit_code = self.run_beat( - logging_args=["-v", "-d", "*"], - extra_args=["setup", - "-template", - "-path.config", self.working_dir, - "-E", "output.elasticsearch.hosts=['" + self.get_host() + "']"], - config="libbeat.yml") - - assert exit_code == 0 - assert len(self.es.cat.templates(name='mockbeat-*', h='name')) > 0 - - @unittest.skipUnless(INTEGRATION_TESTS, "integration test") - @attr('integration') - def test_test_config(self): - """ - Test test config command - """ - self.render_config_template("mockbeat", - os.path.join(self.working_dir, "libbeat.yml")) - - exit_code = self.run_beat( - logging_args=[], - extra_args=["test", "config"], - config="libbeat.yml") - - assert exit_code == 0 - assert self.log_contains("Config OK") - - def test_test_bad_config(self): - """ - Test test config command with bad config - """ - exit_code = self.run_beat( - logging_args=[], - extra_args=["test", "config"], - config="libbeat-missing.yml") - - assert exit_code == 1 - assert self.log_contains("Config OK") is False - - def test_export_config(self): - """ - Test export config works - """ - self.render_config_template("mockbeat", - os.path.join(self.working_dir, - "libbeat.yml"), - metrics_period=1234) - - exit_code = self.run_beat( - logging_args=[], - extra_args=["export", "config"], - config="libbeat.yml") - - assert exit_code == 0 - assert self.log_contains("filename: mockbeat") - assert self.log_contains("period: 1234") - - def test_export_config_environment_variable(self): - """ - Test export config works but doesn"t expose environment variable. - """ - self.render_config_template("mockbeat", - os.path.join(self.working_dir, - "libbeat.yml"), - metrics_period="${METRIC_PERIOD}") - - exit_code = self.run_beat( - logging_args=[], - extra_args=["export", "config"], - config="libbeat.yml", env={'METRIC_PERIOD': '1234'}) - - assert exit_code == 0 - assert self.log_contains("filename: mockbeat") - assert self.log_contains("period: ${METRIC_PERIOD}") - - def test_export_template(self): - """ - Test export template works - """ - self.render_config_template("mockbeat", - os.path.join(self.working_dir, - "mockbeat.yml"), - fields=os.path.join(self.working_dir, "fields.yml")) - shutil.copy(self.beat_path + "/fields.yml", - os.path.join(self.working_dir, "fields.yml")) - exit_code = self.run_beat( - logging_args=[], - extra_args=["export", "template"], - config="mockbeat.yml") - - assert exit_code == 0 - assert self.log_contains('"mockbeat-9.9.9-*"') - assert self.log_contains('"codec": "best_compression"') - - @unittest.skipUnless(INTEGRATION_TESTS, "integration test") - @attr('integration') - def test_test_output(self): - """ - Test test output works - """ - self.render_config_template("mockbeat", - os.path.join(self.working_dir, - "mockbeat.yml"), - elasticsearch={"hosts": '["{}"]'.format(self.get_host())}) - exit_code = self.run_beat( - extra_args=["test", "output"], - config="mockbeat.yml") - - assert exit_code == 0 - assert self.log_contains('parse url... OK') - assert self.log_contains('TLS... WARN secure connection disabled') - assert self.log_contains('talk to server... OK') - - def test_test_wrong_output(self): - """ - Test test wrong output works - """ - self.render_config_template("mockbeat", - os.path.join(self.working_dir, - "mockbeat.yml"), - elasticsearch={"hosts": '["badhost:9200"]'}) - exit_code = self.run_beat( - extra_args=["test", "output"], - config="mockbeat.yml") - - assert exit_code == 1 - assert self.log_contains('parse url... OK') - assert self.log_contains('dns lookup... ERROR') - - def get_host(self): - return os.getenv('ES_HOST', 'localhost') + ':' + os.getenv('ES_PORT', '9200') - - def get_kibana_host(self): - return os.getenv('KIBANA_HOST', 'localhost') - - def get_kibana_port(self): - return os.getenv('KIBANA_PORT', '5601') diff --git a/libbeat/tests/system/test_cmd_export_config.py b/libbeat/tests/system/test_cmd_export_config.py new file mode 100644 index 00000000000..954ee2edb06 --- /dev/null +++ b/libbeat/tests/system/test_cmd_export_config.py @@ -0,0 +1,40 @@ +import os +from base import BaseTest + +INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) + + +class TestCommandExportConfig(BaseTest): + """ + Test beat command `export config` + """ + + def setUp(self): + super(TestCommandExportConfig, self).setUp() + + self.config = "libbeat.yml" + self.output = os.path.join(self.working_dir, self.config) + + def test_default(self): + """ + Test export config works + """ + self.render_config_template(self.beat_name, self.output, metrics_period='1234') + exit_code = self.run_beat(extra_args=["export", "config"], config=self.config) + + assert exit_code == 0 + assert self.log_contains("filename: mockbeat") + assert self.log_contains("period: 1234") + + def test_config_environment_variable(self): + """ + Test export config works but doesn"t expose environment variable. + """ + self.render_config_template(self.beat_name, self.output, + metrics_period="${METRIC_PERIOD}") + exit_code = self.run_beat(extra_args=["export", "config"], config=self.config, + env={'METRIC_PERIOD': '1234'}) + + assert exit_code == 0 + assert self.log_contains("filename: mockbeat") + assert self.log_contains("period: ${METRIC_PERIOD}") diff --git a/libbeat/tests/system/test_cmd_test.py b/libbeat/tests/system/test_cmd_test.py new file mode 100644 index 00000000000..77b2d8f4b64 --- /dev/null +++ b/libbeat/tests/system/test_cmd_test.py @@ -0,0 +1,79 @@ +from base import BaseTest +import os +import logging +import unittest +from nose.plugins.attrib import attr + + +INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) + + +class TestCommandTest(BaseTest): + """ + Test beat subcommands + """ + + def test_config(self): + """ + Test test config command + """ + self.render_config_template("mockbeat", + os.path.join(self.working_dir, "libbeat.yml")) + + exit_code = self.run_beat( + logging_args=[], + extra_args=["test", "config"], + config="libbeat.yml") + + assert exit_code == 0 + assert self.log_contains("Config OK") + + def test_bad_config(self): + """ + Test test config command with bad config + """ + exit_code = self.run_beat( + logging_args=[], + extra_args=["test", "config"], + config="libbeat-missing.yml") + + assert exit_code == 1 + assert self.log_contains("Config OK") is False + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_output(self): + """ + Test test output works + """ + + self.es_client() + logging.getLogger("elasticsearch").setLevel(logging.ERROR) + + self.render_config_template("mockbeat", + os.path.join(self.working_dir, "mockbeat.yml"), + elasticsearch={"hosts": self.get_elasticsearch_url()}) + exit_code = self.run_beat( + extra_args=["test", "output"], + config="mockbeat.yml") + + assert exit_code == 0 + assert self.log_contains('parse url... OK') + assert self.log_contains('TLS... WARN secure connection disabled') + assert self.log_contains('talk to server... OK') + + def test_wrong_output(self): + """ + Test test wrong output works + """ + self.render_config_template("mockbeat", + os.path.join(self.working_dir, + "mockbeat.yml"), + elasticsearch={"hosts": '["badhost:9200"]'}) + exit_code = self.run_beat( + extra_args=["test", "output"], + config="mockbeat.yml") + + assert exit_code == 1 + assert self.log_contains('parse url... OK') + assert self.log_contains('dns lookup... ERROR') diff --git a/libbeat/tests/system/test_cmd_version.py b/libbeat/tests/system/test_cmd_version.py new file mode 100644 index 00000000000..240b8759668 --- /dev/null +++ b/libbeat/tests/system/test_cmd_version.py @@ -0,0 +1,35 @@ +from base import BaseTest +from elasticsearch import Elasticsearch, TransportError + +import logging +import os + + +INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) + + +class TestCommandVersion(BaseTest): + """ + Test beat subcommands + """ + + def setUp(self): + super(BaseTest, self).setUp() + + self.elasticsearch_url = self.get_elasticsearch_url() + print("Using elasticsearch: {}".format(self.elasticsearch_url)) + self.es = Elasticsearch([self.elasticsearch_url]) + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("elasticsearch").setLevel(logging.ERROR) + + def test_version(self): + """ + Test version command + """ + exit_code = self.run_beat( + extra_args=["version"], logging_args=["-v", "-d", "*"]) + assert exit_code == 0 + + assert self.log_contains("mockbeat") + assert self.log_contains("version") + assert self.log_contains("9.9.9") diff --git a/libbeat/tests/system/test_ilm.py b/libbeat/tests/system/test_ilm.py index c765dce28cc..c016fba7129 100644 --- a/libbeat/tests/system/test_ilm.py +++ b/libbeat/tests/system/test_ilm.py @@ -1,267 +1,334 @@ from base import BaseTest +from idxmgmt import IdxMgmt import os -from elasticsearch import Elasticsearch, TransportError from nose.plugins.attrib import attr import unittest import shutil -import logging import datetime +import logging INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) -testPolicyName = "libbeat-test-default-policy" - - -class Test(BaseTest): +class TestRunILM(BaseTest): def setUp(self): - super(BaseTest, self).setUp() - - self.elasticsearch_url = self.get_elasticsearch_url() - print("Using elasticsearch: {}".format(self.elasticsearch_url)) - self.es = Elasticsearch([self.elasticsearch_url]) - self.alias_name = "mockbeat-9.9.9" - self.policy_name = testPolicyName - logging.getLogger("urllib3").setLevel(logging.WARNING) - logging.getLogger("elasticsearch").setLevel(logging.ERROR) + super(TestRunILM, self).setUp() + + self.alias_name = self.policy_name = self.index_name = self.beat_name + "-9.9.9" + self.custom_alias = self.beat_name + "_foo" + self.custom_policy = self.beat_name + "_bar" + self.es = self.es_client() + self.idxmgmt = IdxMgmt(self.es, self.index_name) + self.idxmgmt.delete(index=self.custom_alias) + self.idxmgmt.delete(index=self.custom_policy) + self.idxmgmt.delete(index=self.index_name) + + def tearDown(self): + self.idxmgmt.delete(index=self.custom_alias) + self.idxmgmt.delete(index=self.custom_policy) + self.idxmgmt.delete(index=self.index_name) + + def render_config(self, **kwargs): + self.render_config_template( + elasticsearch={"hosts": self.get_elasticsearch_url()}, + es_template_name=self.index_name, + **kwargs + ) @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') - def test_enabled(self): + def test_ilm_default(self): """ - Test ilm enabled + Test ilm default settings to load ilm policy, write alias and ilm template """ + self.render_config() + proc = self.start_beat() + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains("ILM policy successfully loaded")) + self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + proc.check_kill_and_wait() - self.render_config_template( - ilm={ - "enabled": True, - }, - elasticsearch={ - "hosts": self.get_elasticsearch_url(), - }, - ) + self.idxmgmt.assert_ilm_template_loaded(self.alias_name, self.policy_name, self.alias_name) + self.idxmgmt.assert_alias_created(self.alias_name) + self.idxmgmt.assert_policy_created(self.policy_name) + self.idxmgmt.assert_docs_written_to_alias(self.alias_name) - self.clean() + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_ilm_disabled(self): + """ + Test ilm disabled to not load ilm related components + """ + self.render_config(ilm={"enabled": False}) proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) - self.wait_until(lambda: self.log_contains("Set setup.template.name")) self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) proc.check_kill_and_wait() - # Check if template is loaded with settings - template = self.es.transport.perform_request('GET', '/_template/' + self.alias_name) + self.idxmgmt.assert_index_template_loaded(self.index_name) + self.idxmgmt.assert_alias_not_created(self.alias_name) + self.idxmgmt.assert_policy_not_created(self.policy_name) - print(self.alias_name) - assert template[self.alias_name]["settings"]["index"]["lifecycle"]["name"] == testPolicyName - assert template[self.alias_name]["settings"]["index"]["lifecycle"]["rollover_alias"] == self.alias_name + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_policy_name(self): + """ + Test setting ilm policy name + """ - # Make sure the correct index + alias was created - alias = self.es.transport.perform_request('GET', '/_alias/' + self.alias_name) - d = datetime.datetime.now() - now = d.strftime("%Y.%m.%d") - index_name = self.alias_name + "-" + now + "-000001" - assert index_name in alias - assert alias[index_name]["aliases"][self.alias_name]["is_write_index"] == True + policy_name = self.beat_name + "_foo" + self.render_config(ilm={"enabled": True, "policy_name": policy_name}) - # Asserts that data is actually written to the ILM indices - self.wait_until(lambda: self.es.transport.perform_request( - 'GET', '/' + index_name + '/_search')["hits"]["total"] > 0) + proc = self.start_beat() + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains("ILM policy successfully loaded")) + self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + proc.check_kill_and_wait() - data = self.es.transport.perform_request('GET', '/' + index_name + '/_search') - assert data["hits"]["total"] > 0 + self.idxmgmt.assert_ilm_template_loaded(self.alias_name, policy_name, self.alias_name) + self.idxmgmt.assert_docs_written_to_alias(self.alias_name) + self.idxmgmt.assert_policy_created(policy_name) @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') def test_rollover_alias(self): """ - Test ilm rollover alias setting + Test settings ilm rollover alias """ - alias_name = "foo" - self.render_config_template( - ilm={ - "enabled": True, - "pattern": "1", - "rollover_alias": alias_name - }, - elasticsearch={ - "hosts": self.get_elasticsearch_url(), - }, - ) - - self.clean(alias_name=alias_name) + self.render_config(ilm={"enabled": True, "rollover_alias": self.custom_alias}) proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) - self.wait_until(lambda: self.log_contains("Set setup.template.name")) + self.wait_until(lambda: self.log_contains("ILM policy successfully loaded")) self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) proc.check_kill_and_wait() - # Make sure the correct index + alias was created - print '/_alias/' + alias_name - logfile = self.beat_name + ".log" - with open(os.path.join(self.working_dir, logfile), "r") as f: - print f.read() - - alias = self.es.transport.perform_request('GET', '/_alias/' + alias_name) - index_name = alias_name + "-1" - assert index_name in alias + self.idxmgmt.assert_ilm_template_loaded(self.custom_alias, self.policy_name, self.custom_alias) + self.idxmgmt.assert_docs_written_to_alias(self.custom_alias) + self.idxmgmt.assert_alias_created(self.custom_alias) @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') def test_pattern(self): """ - Test ilm pattern setting + Test setting ilm pattern """ - self.render_config_template( - ilm={ - "enabled": True, - "pattern": "1" - }, - elasticsearch={ - "hosts": self.get_elasticsearch_url(), - }, - ) - - self.clean() + pattern = "1" + self.render_config(ilm={"enabled": True, "pattern": pattern}) proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) - self.wait_until(lambda: self.log_contains("Set setup.template.name")) + self.wait_until(lambda: self.log_contains("ILM policy successfully loaded")) self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) proc.check_kill_and_wait() - # Make sure the correct index + alias was created - print '/_alias/' + self.alias_name - logfile = self.beat_name + ".log" - with open(os.path.join(self.working_dir, logfile), "r") as f: - print f.read() - - alias = self.es.transport.perform_request('GET', '/_alias/' + self.alias_name) - index_name = self.alias_name + "-1" - assert index_name in alias + self.idxmgmt.assert_ilm_template_loaded(self.alias_name, self.policy_name, self.alias_name) + self.idxmgmt.assert_alias_created(self.alias_name, pattern=pattern) + self.idxmgmt.assert_docs_written_to_alias(self.alias_name, pattern=pattern) @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') def test_pattern_date(self): """ - Test ilm pattern with date inside + Test setting ilm pattern with date """ - self.render_config_template( - ilm={ - "enabled": True, - "pattern": "'{now/d}'" - }, - elasticsearch={ - "hosts": self.get_elasticsearch_url(), - }, - ) - - self.clean() + pattern = "'{now/d}'" + self.render_config(ilm={"enabled": True, "pattern": pattern}) proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) - self.wait_until(lambda: self.log_contains("Set setup.template.name")) + self.wait_until(lambda: self.log_contains("ILM policy successfully loaded")) self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) proc.check_kill_and_wait() - # Make sure the correct index + alias was created - print '/_alias/' + self.alias_name - logfile = self.beat_name + ".log" - with open(os.path.join(self.working_dir, logfile), "r") as f: - print f.read() + resolved_pattern = datetime.datetime.now().strftime("%Y.%m.%d") + + self.idxmgmt.assert_ilm_template_loaded(self.alias_name, self.policy_name, self.alias_name) + self.idxmgmt.assert_alias_created(self.alias_name, pattern=resolved_pattern) + self.idxmgmt.assert_docs_written_to_alias(self.alias_name, pattern=resolved_pattern) + + +class TestCommandSetupILMPolicy(BaseTest): + """ + Test beat command `setup` related to ILM policy + """ + + def setUp(self): + super(TestCommandSetupILMPolicy, self).setUp() + + self.setupCmd = "--ilm-policy" + + self.alias_name = self.policy_name = self.index_name = self.beat_name + "-9.9.9" + self.custom_alias = self.beat_name + "_foo" + self.custom_policy = self.beat_name + "_bar" + self.es = self.es_client() + self.idxmgmt = IdxMgmt(self.es, self.index_name) + self.idxmgmt.delete(index=self.custom_alias) + self.idxmgmt.delete(index=self.custom_policy) + self.idxmgmt.delete(index=self.index_name) + + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("elasticsearch").setLevel(logging.ERROR) - # Make sure the correct index + alias was created - alias = self.es.transport.perform_request('GET', '/_alias/' + self.alias_name) - d = datetime.datetime.now() - now = d.strftime("%Y.%m.%d") - index_name = self.alias_name + "-" + now - assert index_name in alias - assert alias[index_name]["aliases"][self.alias_name]["is_write_index"] == True + def tearDown(self): + self.idxmgmt.delete(index=self.custom_alias) + self.idxmgmt.delete(index=self.custom_policy) + self.idxmgmt.delete(index=self.index_name) + + def render_config(self, **kwargs): + self.render_config_template( + elasticsearch={"hosts": self.get_elasticsearch_url()}, + es_template_name=self.index_name, + **kwargs + ) @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') - def test_setup_ilm_policy(self): + def test_setup_ilm_policy_and_template(self): """ - Test ilm policy setup + Test combination of ilm policy and template setup """ + self.render_config() - self.clean() + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, "--template"]) - shutil.copy(self.beat_path + "/_meta/config.yml", - os.path.join(self.working_dir, "libbeat.yml")) - shutil.copy(self.beat_path + "/fields.yml", - os.path.join(self.working_dir, "fields.yml")) + assert exit_code == 0 + self.idxmgmt.assert_ilm_template_loaded(self.alias_name, self.policy_name, self.alias_name) + self.idxmgmt.assert_docs_written_to_alias(self.alias_name) + self.idxmgmt.assert_alias_created(self.alias_name) + self.idxmgmt.assert_policy_created(self.policy_name) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_setup_ilm_default(self): + """ + Test ilm policy setup with default config + """ + self.render_config() - exit_code = self.run_beat( - logging_args=["-v", "-d", "*"], - extra_args=["setup", - "--ilm-policy", - "-path.config", self.working_dir, - "-E", "setup.ilm.policy_name=" + self.policy_name, - "-E", "output.elasticsearch.hosts=['" + self.get_elasticsearch_url() + "']"], - config="libbeat.yml") + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd]) assert exit_code == 0 + self.idxmgmt.assert_ilm_template_loaded(self.alias_name, self.policy_name, self.alias_name) + self.idxmgmt.assert_docs_written_to_alias(self.alias_name) + self.idxmgmt.assert_alias_created(self.alias_name) + self.idxmgmt.assert_policy_created(self.policy_name) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_setup_ilm_disabled(self): + """ + Test ilm policy setup when ilm disabled + """ + self.render_config() + + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, + "-E", "setup.ilm.enabled=false"]) - policy = self.es.transport.perform_request('GET', "/_ilm/policy/" + self.policy_name) - assert self.policy_name in policy + assert exit_code == 0 + self.idxmgmt.assert_index_template_loaded(self.index_name) + self.idxmgmt.assert_alias_not_created(self.alias_name) + self.idxmgmt.assert_policy_not_created(self.policy_name) @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') - def test_export_ilm_policy(self): + def test_policy_name(self): """ - Test ilm policy export + Test ilm policy setup when policy_name is configured """ + self.render_config() + + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, + "-E", "setup.ilm.policy_name=" + self.custom_policy]) - self.clean() + assert exit_code == 0 + self.idxmgmt.assert_ilm_template_loaded(self.alias_name, self.custom_policy, self.alias_name) + self.idxmgmt.assert_policy_created(self.custom_policy) - shutil.copy(self.beat_path + "/_meta/config.yml", - os.path.join(self.working_dir, "libbeat.yml")) - shutil.copy(self.beat_path + "/fields.yml", - os.path.join(self.working_dir, "fields.yml")) + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_rollover_alias(self): + """ + Test ilm policy setup when rollover_alias is configured + """ + self.render_config() - exit_code = self.run_beat( - logging_args=["-v", "-d", "*"], - extra_args=["export", - "ilm-policy", - ], - config="libbeat.yml") + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, + "-E", "setup.ilm.rollover_alias=" + self.custom_alias]) assert exit_code == 0 + self.idxmgmt.assert_ilm_template_loaded(self.custom_alias, self.policy_name, self.custom_alias) + self.idxmgmt.assert_docs_written_to_alias(self.custom_alias) + self.idxmgmt.assert_alias_created(self.custom_alias) + +class TestCommandExportILMPolicy(BaseTest): + """ + Test beat command `export ilm-policy` + """ + + def setUp(self): + super(TestCommandExportILMPolicy, self).setUp() + + self.config = "libbeat.yml" + self.output = os.path.join(self.working_dir, self.config) + shutil.copy(os.path.join(self.beat_path, "fields.yml"), self.output) + self.policy_name = self.beat_name + "-9.9.9" + self.cmd = "ilm-policy" + + def assert_log_contains_policy(self, policy): + assert self.log_contains('ILM policy successfully loaded.') + assert self.log_contains(policy) assert self.log_contains('"max_age": "30d"') assert self.log_contains('"max_size": "50gb"') - def clean(self, alias_name=""): - - if alias_name == "": - alias_name = self.alias_name - - # Delete existing indices and aliases with it policy - try: - self.es.transport.perform_request('DELETE', "/" + alias_name + "*") - except: - pass - - # Delete any existing policy - try: - self.es.transport.perform_request('DELETE', "/_ilm/policy/" + self.policy_name) - except: - pass - - # Delete templates - try: - self.es.transport.perform_request('DELETE', "/_template/mockbeat*") - except: - pass - - # Delete indices - try: - self.es.transport.perform_request('DELETE', "/foo*,mockbeat*") - except: - pass + def assert_log_contains_write_alias(self): + assert self.log_contains('Write alias successfully generated.') + + def test_default(self): + """ + Test ilm-policy export with default config + """ + + exit_code = self.run_beat(extra_args=["export", self.cmd], + config=self.config) + + assert exit_code == 0 + self.assert_log_contains_policy(self.policy_name) + self.assert_log_contains_write_alias() + + def test_load_disabled(self): + """ + Test ilm-policy export when ilm disabled in config + """ + + exit_code = self.run_beat(extra_args=["export", self.cmd, "-E", "setup.ilm.enabled=false"], + config=self.config) + + assert exit_code == 0 + self.assert_log_contains_policy(self.policy_name) + self.assert_log_contains_write_alias() + + def test_changed_policy_name(self): + """ + Test ilm-policy export when policy name is changed + + """ + policy_name = "foo" + + exit_code = self.run_beat(extra_args=["export", self.cmd, "-E", "setup.ilm.policy_name=" + policy_name], + config=self.config) + + assert exit_code == 0 + self.assert_log_contains_policy(policy_name) + self.assert_log_contains_write_alias() diff --git a/libbeat/tests/system/test_template.py b/libbeat/tests/system/test_template.py index 47e9162ac60..962f5aed419 100644 --- a/libbeat/tests/system/test_template.py +++ b/libbeat/tests/system/test_template.py @@ -1,8 +1,10 @@ from base import BaseTest +from idxmgmt import IdxMgmt import os -from elasticsearch import Elasticsearch, TransportError from nose.plugins.attrib import attr import unittest +import shutil +import logging INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) @@ -21,7 +23,7 @@ def test_index_modified(self): assert exit_code == 1 assert self.log_contains( - "setup.template.name and setup.template.pattern have to be set if index name is modified") is True + "setup.template.name and setup.template.pattern have to be set if index name is modified") def test_index_not_modified(self): """ @@ -48,7 +50,7 @@ def test_index_modified_no_pattern(self): assert exit_code == 1 assert self.log_contains( - "setup.template.name and setup.template.pattern have to be set if index name is modified") is True + "setup.template.name and setup.template.pattern have to be set if index name is modified") def test_index_modified_no_name(self): """ @@ -63,7 +65,7 @@ def test_index_modified_no_name(self): assert exit_code == 1 assert self.log_contains( - "setup.template.name and setup.template.pattern have to be set if index name is modified") is True + "setup.template.name and setup.template.pattern have to be set if index name is modified") def test_index_with_pattern_name(self): """ @@ -105,9 +107,277 @@ def test_json_template(self): self.wait_until(lambda: self.log_contains("template with name 'bla' loaded")) proc.check_kill_and_wait() - es = Elasticsearch([self.get_elasticsearch_url()]) + es = self.es_client() result = es.transport.perform_request('GET', '/_template/bla') assert len(result) == 1 def get_host(self): return os.getenv('ES_HOST', 'localhost') + ':' + os.getenv('ES_PORT', '9200') + + +class TestRunTemplate(BaseTest): + """ + Test run cmd with focus on template setup + """ + + def setUp(self): + super(TestRunTemplate, self).setUp() + # auto-derived default settings, if nothing else is set + self.index_name = self.beat_name + "-9.9.9" + + self.es = self.es_client() + self.idxmgmt = IdxMgmt(self.es, self.index_name) + self.idxmgmt.delete(index=self.index_name) + + def tearDown(self): + self.idxmgmt.delete(index=self.index_name) + + def render_config(self, **kwargs): + self.render_config_template( + elasticsearch={"hosts": self.get_elasticsearch_url()}, + **kwargs + ) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_template_default(self): + """ + Test run cmd with default settings for template + """ + self.render_config() + proc = self.start_beat() + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains("template with name 'mockbeat-9.9.9' loaded")) + self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + proc.check_kill_and_wait() + + self.idxmgmt.assert_ilm_template_loaded(self.index_name, self.index_name, self.index_name) + self.idxmgmt.assert_alias_created(self.index_name) + self.idxmgmt.assert_docs_written_to_alias(self.index_name) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_template_disabled(self): + """ + Test run cmd does not load template when disabled in config + """ + self.render_config() + proc = self.start_beat(extra_args=["-E", "setup.template.enabled=false"]) + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + proc.check_kill_and_wait() + + self.idxmgmt.assert_index_template_not_loaded(self.index_name) + + +class TestCommandSetupTemplate(BaseTest): + """ + Test beat command `setup` with focus on template + """ + + def setUp(self): + super(TestCommandSetupTemplate, self).setUp() + + # auto-derived default settings, if nothing else is set + self.setupCmd = "--template" + self.index_name = self.beat_name + "-9.9.9" + self.custom_alias = self.beat_name + "_foo" + + self.es = self.es_client() + self.idxmgmt = IdxMgmt(self.es, self.index_name) + self.idxmgmt.delete(index=self.custom_alias) + self.idxmgmt.delete(index=self.index_name) + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("elasticsearch").setLevel(logging.ERROR) + + def tearDown(self): + self.idxmgmt.delete(index=self.custom_alias) + self.idxmgmt.delete(index=self.index_name) + + def render_config(self, **kwargs): + self.render_config_template( + elasticsearch={"hosts": self.get_elasticsearch_url()}, + **kwargs + ) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_setup(self): + """ + Test setup cmd with template and ilm-policy subcommands + """ + self.render_config() + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, "--ilm-policy"]) + + assert exit_code == 0 + self.idxmgmt.assert_ilm_template_loaded(self.index_name, self.index_name, self.index_name) + self.idxmgmt.assert_alias_created(self.index_name) + self.idxmgmt.assert_policy_created(self.index_name) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_setup_template_default(self): + """ + Test template setup with default config + """ + self.render_config() + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd]) + + assert exit_code == 0 + self.idxmgmt.assert_ilm_template_loaded(self.index_name, self.index_name, self.index_name) + self.idxmgmt.assert_index_template_index_pattern(self.index_name, [self.index_name + "-*"]) + + # when running `setup --template` + # write_alias and rollover_policy related to ILM are also created + self.idxmgmt.assert_alias_created(self.index_name) + self.idxmgmt.assert_policy_created(self.index_name) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_setup_template_disabled(self): + """ + Test template setup when ilm disabled + """ + self.render_config() + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, + "-E", "setup.template.enabled=false"]) + + assert exit_code == 0 + self.idxmgmt.assert_index_template_not_loaded(self.index_name) + + # when running `setup --template` and `setup.template.enabled=false` + # write_alias and rollover_policy related to ILM are still created + self.idxmgmt.assert_alias_created(self.index_name) + self.idxmgmt.assert_policy_created(self.index_name) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_setup_template_with_opts(self): + """ + Test template setup with config options + """ + self.render_config() + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, + "-E", "setup.ilm.enabled=false", + "-E", "setup.template.settings.index.number_of_shards=2"]) + + assert exit_code == 0 + self.idxmgmt.assert_index_template_loaded(self.index_name) + + # check that settings are overwritten + resp = self.es.transport.perform_request('GET', '/_template/' + self.index_name) + assert self.index_name in resp + index = resp[self.index_name]["settings"]["index"] + assert index["number_of_shards"] == "2", index["number_of_shards"] + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_setup_template_with_ilm_changed_pattern(self): + """ + Test template setup with changed ilm.rollover_alias config + """ + self.render_config() + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, + "-E", "setup.ilm.rollover_alias=" + self.custom_alias]) + + assert exit_code == 0 + self.idxmgmt.assert_ilm_template_loaded(self.custom_alias, self.index_name, self.custom_alias) + self.idxmgmt.assert_index_template_index_pattern(self.custom_alias, [self.custom_alias + "-*"]) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_template_created_on_ilm_policy_created(self): + """ + Test template setup overwrites template when new ilm policy is created + """ + + # ensure template with ilm rollover_alias name is created, but ilm policy not yet + self.render_config() + exit_code = self.run_beat(logging_args=["-v", "-d", "*"], + extra_args=["setup", self.setupCmd, + "-E", "setup.ilm.enabled=false", + "-E", "setup.template.name=" + self.custom_alias, + "-E", "setup.template.pattern=" + self.custom_alias + "*"]) + assert exit_code == 0 + self.idxmgmt.assert_index_template_loaded(self.custom_alias) + self.idxmgmt.assert_policy_not_created(self.index_name) + + # ensure ilm policy is created, triggering overwriting existing template + exit_code = self.run_beat(extra_args=["setup", self.setupCmd, + "-E", "setup.template.overwrite=false", + "-E", "setup.template.settings.index.number_of_shards=2", + "-E", "setup.ilm.rollover_alias=" + self.custom_alias]) + assert exit_code == 0 + self.idxmgmt.assert_ilm_template_loaded(self.custom_alias, self.index_name, self.custom_alias) + self.idxmgmt.assert_policy_created(self.index_name) + # check that template was overwritten + resp = self.es.transport.perform_request('GET', '/_template/' + self.custom_alias) + assert self.custom_alias in resp + index = resp[self.custom_alias]["settings"]["index"] + assert index["number_of_shards"] == "2", index["number_of_shards"] + + +class TestCommandExportTemplate(BaseTest): + """ + Test beat command `export template` + """ + + def setUp(self): + super(TestCommandExportTemplate, self).setUp() + + self.config = "libbeat.yml" + self.output = os.path.join(self.working_dir, self.config) + shutil.copy(os.path.join(self.beat_path, "fields.yml"), self.output) + self.template_name = self.beat_name + "-9.9.9" + + def assert_log_contains_template(self, template, index_pattern): + assert self.log_contains('Loaded index template') + assert self.log_contains(template) + assert self.log_contains(index_pattern) + + def test_default(self): + """ + Test export template works + """ + self.render_config_template(self.beat_name, self.output, + fields=self.output) + exit_code = self.run_beat( + extra_args=["export", "template"], + config=self.config) + + assert exit_code == 0 + self.assert_log_contains_template(self.template_name, self.template_name + "-*") + + def test_changed_index_pattern(self): + """ + Test export template with changed index pattern + """ + self.render_config_template(self.beat_name, self.output, + fields=self.output) + alias_name = "mockbeat-ilm-index-pattern" + + exit_code = self.run_beat( + extra_args=["export", "template", + "-E", "setup.ilm.rollover_alias=" + alias_name], + config=self.config) + + assert exit_code == 0 + self.assert_log_contains_template(self.template_name, alias_name + "-*") + + def test_load_disabled(self): + """ + Test template also exported when disabled in config + """ + self.render_config_template(self.beat_name, self.output, + fields=self.output) + exit_code = self.run_beat( + extra_args=["export", "template", "-E", "setup.template.enabled=false"], + config=self.config) + + assert exit_code == 0 + self.assert_log_contains_template(self.template_name, self.template_name + "-*")