diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index ce0647b4568..43cd2635c26 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/elasticsearch" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/crawler" @@ -18,7 +19,10 @@ import ( "github.com/elastic/beats/filebeat/spooler" ) -var once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF") +var ( + once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF") + setup = flag.Bool("setup", false, "Run the setup phase for the modules") +) // Filebeat is a beater object. Contains all objects needed to run the beat type Filebeat struct { @@ -67,11 +71,33 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { return fb, nil } +// Setup is called on user request (the -setup flag) to do the initial Beat setup. +func (fb *Filebeat) Setup(b *beat.Beat) error { + esConfig := b.Config.Output["elasticsearch"] + if esConfig == nil || !esConfig.Enabled() { + return fmt.Errorf("Setup requested but the Elasticsearch output is not configured/enabled") + } + esClient, err := elasticsearch.NewConnectedClient(esConfig) + if err != nil { + return fmt.Errorf("Error creating ES client: %v", err) + } + defer esClient.Close() + + return fb.moduleRegistry.Setup(esClient) +} + // Run allows the beater to be run as a beat. func (fb *Filebeat) Run(b *beat.Beat) error { var err error config := fb.config + if *setup { + err = fb.Setup(b) + if err != nil { + return err + } + } + waitFinished := newSignalWait() waitEvents := newSignalWait() diff --git a/filebeat/filebeat.py b/filebeat/filebeat.py deleted file mode 100755 index e398c471074..00000000000 --- a/filebeat/filebeat.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python -import argparse -import sys -import os -import yaml -import requests -import tempfile -import subprocess -import socket -from jinja2 import Template - - -def main(): - parser = argparse.ArgumentParser( - description="PROTOTYPE: start filebeat with a module configuration") - parser.add_argument("--modules", default="", - help="From branch") - parser.add_argument("--es", default="http://localhost:9200", - help="Elasticsearch URL") - parser.add_argument("--index", default=None, - help="Elasticsearch index") - parser.add_argument("--registry", default=None, - help="Registry file to use") - parser.add_argument("-M", nargs="*", type=str, default=None, - help="Variables overrides. e.g. path=/test") - parser.add_argument("--once", action="store_true", - help="Run filebeat with the -once flag") - - args = parser.parse_args() - print args - - # changing directory because we use paths relative to the binary - os.chdir(os.path.dirname(sys.argv[0])) - - modules = args.modules.split(",") - if len(modules) == 0: - print("You need to specify at least a module") - sys.exit(1) - - # load_dashboards(args) - load_datasets(args, modules) - - -def load_dashboards(args): - cmd = ["../libbeat/dashboards/import_dashboards", - "-dir", "_meta/kibana", - "-es", args.es] - subprocess.Popen(cmd).wait() - - -def load_datasets(args, modules): - for module in modules: - path = os.path.join("module", module) - if not os.path.isdir(path): - print("Module {} not found".format(module)) - sys.exit(1) - print("Found module {} in {}".format(module, path)) - - filesets = [name for name in os.listdir(path) if - os.path.isfile(os.path.join(path, name, "manifest.yml"))] - - print("Found filesets: {}".format(filesets)) - - for fileset in filesets: - load_fileset(args, module, fileset, - os.path.join(path, fileset)) - - run_filebeat(args) - - -def load_fileset(args, module, fileset, path): - manifest = yaml.load(file(os.path.join(path, "manifest.yml"), "r")) - var = evaluate_vars(args, manifest["var"], module, fileset) - var["beat"] = dict(module=module, fileset=fileset, path=path, args=args) - print("Evaluated variables: {}".format(var)) - - load_pipeline(var, manifest["ingest_pipeline"]) - - -def evaluate_vars(args, var_in, module, fileset): - var = { - "builtin": get_builtin_vars() - } - for vals in var_in: - name = vals["name"] - var[name] = vals["default"] - if sys.platform == "darwin" and "os.darwin" in vals: - var[name] = vals["os.darwin"] - elif sys.platform == "windows" and "os.windows" in vals: - var[name] = vals["os.windows"] - - if isinstance(var[name], basestring): - var[name] = apply_template(var[name], var) - elif isinstance(var[name], list): - # only supports array of strings atm - var[name] = [apply_template(x, var) for x in var[name]] - - return var - - -def apply_template(tpl, var): - tpl = tpl.replace("{{.", "{{") # Go templates - return Template(tpl).render(var) - - -def get_builtin_vars(): - host = socket.gethostname() - hostname, _, domain = host.partition(".") - # separate the domain - return { - "hostname": hostname, - "domain": domain - } - - -def load_pipeline(var, pipeline): - path = os.path.join(var["beat"]["path"], apply_template(pipeline, var)) - print("Loading ingest pipeline: {}".format(path)) - var["beat"]["pipeline_id"] = var["beat"]["module"] + '-' + var["beat"]["fileset"] + \ - '-' + os.path.splitext(os.path.basename(path))[0] - print("Pipeline id: {}".format(var["beat"]["pipeline_id"])) - - with open(path, "r") as f: - contents = f.read() - - r = requests.put("{}/_ingest/pipeline/{}" - .format(var["beat"]["args"].es, - var["beat"]["pipeline_id"]), - data=contents) - if r.status_code >= 300: - print("Error posting pipeline: {}".format(r.text)) - sys.exit(1) - - -def run_filebeat(args): - cfg_template = """ -output.elasticsearch.hosts: ["{{es}}"] -output.elasticsearch.pipeline: "%{[fields.pipeline_id]}" -""" - if args.index: - cfg_template += "\noutput.elasticsearch.index: {}".format(args.index) - - if args.once: - cfg_template += "\nfilebeat.idle_timeout: 0.5s" - - if args.registry: - cfg_template += "\nfilebeat.registry_file: {}".format(args.registry) - - fd, fname = tempfile.mkstemp(suffix=".yml", prefix="filebeat-", - text=True) - with open(fname, "w") as cfgfile: - cfgfile.write(Template(cfg_template).render( - dict(es=args.es))) - print("Wrote configuration file: {}".format(cfgfile.name)) - os.close(fd) - - cmd = ["./filebeat.test", "-systemTest", - "-modules", args.modules, - "-e", "-c", cfgfile.name, "-d", "*"] - for override in args.M: - cmd.extend(["-M", override]) - if args.once: - cmd.extend(["-M", "*.*.prospector.close_eof=true"]) - cmd.append("-once") - print("Starting filebeat: " + " ".join(cmd)) - - subprocess.Popen(cmd).wait() - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index def198ef86a..0473f269981 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -7,6 +7,7 @@ package fileset import ( "bytes" + "encoding/json" "fmt" "io/ioutil" "os" @@ -242,7 +243,31 @@ func (fs *Fileset) getPipelineID() (string, error) { return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) } - return fmt.Sprintf("%s-%s-%s", fs.mcfg.Module, fs.name, removeExt(filepath.Base(path))), nil + return formatPipelineID(fs.mcfg.Module, fs.name, path), nil +} + +func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) { + path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline) + if err != nil { + return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) + } + + f, err := os.Open(filepath.Join(fs.modulePath, fs.name, path)) + if err != nil { + return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err) + } + + dec := json.NewDecoder(f) + err = dec.Decode(&content) + if err != nil { + return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) + } + return formatPipelineID(fs.mcfg.Module, fs.name, path), content, nil +} + +// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch +func formatPipelineID(module, fileset, path string) string { + return fmt.Sprintf("%s-%s-%s", module, fileset, removeExt(filepath.Base(path))) } // removeExt returns the file name without the extension. If no dot is found, diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 6baa7a02142..b65f04d03d2 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -1,3 +1,5 @@ +// +build !integration + package fileset import ( @@ -176,7 +178,18 @@ func TestGetProspectorConfigNginxOverrides(t *testing.T) { assert.True(t, cfg.HasField("paths")) assert.True(t, cfg.HasField("exclude_files")) assert.True(t, cfg.HasField("close_eof")) - pipeline_id := fs.vars["beat"].(map[string]interface{})["pipeline_id"] - assert.Equal(t, "nginx-access-with_plugins", pipeline_id) + pipelineID := fs.vars["beat"].(map[string]interface{})["pipeline_id"] + assert.Equal(t, "nginx-access-with_plugins", pipelineID) + +} +func TestGetPipelineNginx(t *testing.T) { + fs := getModuleForTesting(t, "nginx", "access") + assert.NoError(t, fs.Read()) + + pipelineID, content, err := fs.GetPipeline() + assert.NoError(t, err) + assert.Equal(t, "nginx-access-with_plugins", pipelineID) + assert.Contains(t, content, "description") + assert.Contains(t, content, "processors") } diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 67adfb8eed6..d1dfa705504 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -240,3 +240,36 @@ func (reg *ModuleRegistry) GetProspectorConfigs() ([]*common.Config, error) { } return result, nil } + +// PipelineLoader is a subset of the Elasticsearch client API capable of loading +// the pipelines. +type PipelineLoader interface { + LoadJSON(path string, json map[string]interface{}) error +} + +// Setup is called on -setup and loads the pipelines for each configured fileset. +func (reg *ModuleRegistry) Setup(esClient PipelineLoader) error { + for module, filesets := range reg.registry { + for name, fileset := range filesets { + pipelineID, content, err := fileset.GetPipeline() + if err != nil { + return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) + } + err = loadPipeline(esClient, pipelineID, content) + if err != nil { + return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + } + } + } + return nil +} + +func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}) error { + path := "/_ingest/pipeline/" + pipelineID + err := esClient.LoadJSON(path, content) + if err != nil { + return fmt.Errorf("couldn't load template: %v", err) + } + logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID) + return nil +} diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go new file mode 100644 index 00000000000..2a6167ea802 --- /dev/null +++ b/filebeat/fileset/modules_integration_test.go @@ -0,0 +1,58 @@ +// +build integration + +package fileset + +import ( + "path/filepath" + "testing" + + "github.com/elastic/beats/libbeat/outputs/elasticsearch" + "github.com/stretchr/testify/assert" +) + +func TestLoadPipeline(t *testing.T) { + client := elasticsearch.GetTestingElasticsearch() + client.Request("DELETE", "/_ingest/pipeline/my-pipeline-id", "", nil, nil) + + content := map[string]interface{}{ + "description": "describe pipeline", + "processors": []map[string]interface{}{ + { + "set": map[string]interface{}{ + "field": "foo", + "value": "bar", + }, + }, + }, + } + + err := loadPipeline(client, "my-pipeline-id", content) + assert.NoError(t, err) + + status, _, _ := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil) + assert.Equal(t, 200, status) +} + +func TestSetupNginx(t *testing.T) { + client := elasticsearch.GetTestingElasticsearch() + client.Request("DELETE", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil) + + modulesPath, err := filepath.Abs("../module") + assert.NoError(t, err) + + configs := []ModuleConfig{ + ModuleConfig{Module: "nginx"}, + } + + reg, err := newModuleRegistry(modulesPath, configs, nil) + assert.NoError(t, err) + + err = reg.Setup(client) + assert.NoError(t, err) + + status, _, _ := client.Request("GET", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil) + assert.Equal(t, 200, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil) + assert.Equal(t, 200, status) +} diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index cd3e2794015..b44a93d427e 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -1,3 +1,5 @@ +// +build !integration + package fileset import ( diff --git a/filebeat/tests/system/config/filebeat_modules.yml.j2 b/filebeat/tests/system/config/filebeat_modules.yml.j2 new file mode 100644 index 00000000000..87e2937e35c --- /dev/null +++ b/filebeat/tests/system/config/filebeat_modules.yml.j2 @@ -0,0 +1,6 @@ +filebeat.idle_timeout: 0.5s +filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} + +output.elasticsearch.hosts: ["{{ elasticsearch_url }}"] +output.elasticsearch.index: {{ index_name }} +output.elasticsearch.pipeline: "%{[fields.pipeline_id]}" diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index 5451659ec7c..845b6dff02c 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -21,7 +21,9 @@ def init(self): "/../../../../module") self.filebeat = os.path.abspath(self.working_dir + - "/../../../../filebeat.py") + "/../../../../filebeat.test") + + self.index_name = "test-filebeat-modules" @unittest.skipIf(not INTEGRATION_TESTS or os.getenv("TESTING_ENVIRONMENT") == "2x", @@ -34,6 +36,14 @@ def test_modules(self): else: modules = os.listdir(self.modules_path) + # generate a minimal configuration + cfgfile = os.path.join(self.working_dir, "filebeat.yml") + self.render_config_template( + template="filebeat_modules.yml.j2", + output=cfgfile, + index_name=self.index_name, + elasticsearch_url=self.elasticsearch_url) + for module in modules: path = os.path.join(self.modules_path, module) filesets = [name for name in os.listdir(path) if @@ -47,28 +57,28 @@ def test_modules(self): self.run_on_file( module=module, fileset=fileset, - test_file=test_file) + test_file=test_file, + cfgfile=cfgfile) - def run_on_file(self, module, fileset, test_file): + def run_on_file(self, module, fileset, test_file, cfgfile): print("Testing {}/{} on {}".format(module, fileset, test_file)) - index_name = "test-filebeat-modules" try: - self.es.indices.delete(index=index_name) + self.es.indices.delete(index=self.index_name) except: pass cmd = [ - self.filebeat, - "--once", - "--modules={}".format(module), + self.filebeat, "-systemTest", + "-e", "-d", "*", "-once", "-setup", + "-c", cfgfile, + "-modules={}".format(module), "-M", "{module}.{fileset}.var.paths=[{test_file}]".format( module=module, fileset=fileset, test_file=test_file), - "--es", self.elasticsearch_url, - "--index", index_name, - "--registry", self.working_dir + "/registry" + "-M", "*.*.prospector.close_eof=true", ] output = open(os.path.join(self.working_dir, "output.log"), "ab") + output.write(" ".join(cmd) + "\n") subprocess.Popen(cmd, stdin=None, stdout=output, @@ -76,10 +86,10 @@ def run_on_file(self, module, fileset, test_file): bufsize=0).wait() # Make sure index exists - self.wait_until(lambda: self.es.indices.exists(index_name)) + self.wait_until(lambda: self.es.indices.exists(self.index_name)) - self.es.indices.refresh(index=index_name) - res = self.es.search(index=index_name, + self.es.indices.refresh(index=self.index_name) + res = self.es.search(index=self.index_name, body={"query": {"match_all": {}}}) objects = [o["_source"] for o in res["hits"]["hits"]] assert len(objects) > 0 diff --git a/libbeat/outputs/elasticsearch/api.go b/libbeat/outputs/elasticsearch/api.go index 97687570b73..f38687a853a 100644 --- a/libbeat/outputs/elasticsearch/api.go +++ b/libbeat/outputs/elasticsearch/api.go @@ -192,5 +192,5 @@ func (es *Connection) apiCall( if err != nil { return 0, nil, err } - return es.request(method, path, pipeline, params, body) + return es.Request(method, path, pipeline, params, body) } diff --git a/libbeat/outputs/elasticsearch/api_test.go b/libbeat/outputs/elasticsearch/api_test.go index 10d04d21104..2e57abebc3b 100644 --- a/libbeat/outputs/elasticsearch/api_test.go +++ b/libbeat/outputs/elasticsearch/api_test.go @@ -2,49 +2,11 @@ package elasticsearch import ( - "os" "testing" - "time" - "github.com/elastic/beats/libbeat/outputs/outil" "github.com/stretchr/testify/assert" ) -const ElasticsearchDefaultHost = "localhost" -const ElasticsearchDefaultPort = "9200" - -func GetEsPort() string { - port := os.Getenv("ES_PORT") - - if len(port) == 0 { - port = ElasticsearchDefaultPort - } - return port -} - -// Returns -func GetEsHost() string { - - host := os.Getenv("ES_HOST") - - if len(host) == 0 { - host = ElasticsearchDefaultHost - } - - return host -} - -func GetTestingElasticsearch() *Client { - var address = "http://" + GetEsHost() + ":" + GetEsPort() - username := os.Getenv("ES_USER") - pass := os.Getenv("ES_PASS") - client := newTestClientAuth(address, username, pass) - - // Load version number - client.Connect(3 * time.Second) - return client -} - func GetValidQueryResult() QueryResult { result := QueryResult{ Ok: true, @@ -172,18 +134,3 @@ func TestReadSearchResult_invalid(t *testing.T) { func newTestClient(url string) *Client { return newTestClientAuth(url, "", "") } - -func newTestClientAuth(url, user, pass string) *Client { - client, err := NewClient(ClientSettings{ - URL: url, - Index: outil.MakeSelector(), - Username: user, - Password: pass, - Timeout: 60 * time.Second, - CompressionLevel: 3, - }, nil) - if err != nil { - panic(err) - } - return client -} diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 649bc2db05a..2b2322af9a1 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -27,6 +27,7 @@ type Client struct { index outil.Selector pipeline *outil.Selector params map[string]string + timeout time.Duration // buffered bulk requests bulkRequ *bulkRequest @@ -173,6 +174,7 @@ func NewClient( index: s.Index, pipeline: pipeline, params: params, + timeout: s.Timeout, bulkRequ: bulkRequ, @@ -584,7 +586,7 @@ func (client *Client) LoadTemplate(templateName string, template map[string]inte } func (client *Client) LoadJSON(path string, json map[string]interface{}) error { - status, _, err := client.request("PUT", path, "", nil, json) + status, _, err := client.Request("PUT", path, "", nil, json) if err != nil { return fmt.Errorf("couldn't load json. Error: %s", err) } @@ -599,7 +601,7 @@ func (client *Client) LoadJSON(path string, json map[string]interface{}) error { // and only if Elasticsearch returns with HTTP status code 200. func (client *Client) CheckTemplate(templateName string) bool { - status, _, _ := client.request("HEAD", "/_template/"+templateName, "", nil, nil) + status, _, _ := client.Request("HEAD", "/_template/"+templateName, "", nil, nil) if status != 200 { return false @@ -657,7 +659,7 @@ func (conn *Connection) Close() error { return nil } -func (conn *Connection) request( +func (conn *Connection) Request( method, path string, pipeline string, params map[string]string, diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index d7857203d46..eedc4272906 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -62,7 +62,7 @@ func TestLoadTemplate(t *testing.T) { assert.True(t, client.CheckTemplate(templateName)) // Delete template again to clean up - client.request("DELETE", "/_template/"+templateName, "", nil, nil) + client.Request("DELETE", "/_template/"+templateName, "", nil, nil) // Make sure it was removed assert.False(t, client.CheckTemplate(templateName)) @@ -134,7 +134,7 @@ func TestLoadBeatsTemplate(t *testing.T) { assert.True(t, client.CheckTemplate(templateName)) // Delete template again to clean up - client.request("DELETE", "/_template/"+templateName, "", nil, nil) + client.Request("DELETE", "/_template/"+templateName, "", nil, nil) // Make sure it was removed assert.False(t, client.CheckTemplate(templateName)) @@ -152,7 +152,7 @@ func TestOutputLoadTemplate(t *testing.T) { } // delete template if it exists - client.request("DELETE", "/_template/libbeat", "", nil, nil) + client.Request("DELETE", "/_template/libbeat", "", nil, nil) // Make sure template is not yet there assert.False(t, client.CheckTemplate("libbeat")) diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index a2948ca022f..4ab3b03ca15 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -72,6 +72,92 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu return output, nil } +// NewConnectedClient creates a new Elasticsearch client based on the given config. +// It uses the NewElasticsearchClients to create a list of clients then returns +// the first from the list that successfully connects. +func NewConnectedClient(cfg *common.Config) (*Client, error) { + clients, err := NewElasticsearchClients(cfg) + if err != nil { + return nil, err + } + + for _, client := range clients { + err = client.Connect(client.timeout) + if err != nil { + logp.Err("Error connecting to Elasticsearch: %s", client.Connection.URL) + continue + } + return &client, nil + } + return nil, fmt.Errorf("Couldn't connect to any of the configured Elasticsearch hosts") +} + +// NewElasticsearchClients returns a list of Elasticsearch clients based on the given +// configuration. It accepts the same configuration parameters as the output, +// except for the output specific configuration options (index, pipeline, +// template) .If multiple hosts are defined in the configuration, a client is returned +// for each of them. +func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { + + hosts, err := modeutil.ReadHostList(cfg) + if err != nil { + return nil, err + } + + config := defaultConfig + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + tlsConfig, err := outputs.LoadTLSConfig(config.TLS) + if err != nil { + return nil, err + } + + var proxyURL *url.URL + if config.ProxyURL != "" { + proxyURL, err = parseProxyURL(config.ProxyURL) + if err != nil { + return nil, err + } + + logp.Info("Using proxy URL: %s", proxyURL) + } + + params := config.Params + if len(params) == 0 { + params = nil + } + + clients := []Client{} + for _, host := range hosts { + esURL, err := getURL(config.Protocol, config.Path, host) + if err != nil { + logp.Err("Invalid host param set: %s, Error: %v", host, err) + return nil, err + } + + client, err := NewClient(ClientSettings{ + URL: esURL, + Proxy: proxyURL, + TLS: tlsConfig, + Username: config.Username, + Password: config.Password, + Parameters: params, + Timeout: config.Timeout, + CompressionLevel: config.CompressionLevel, + }, nil) + if err != nil { + return clients, err + } + clients = append(clients, *client) + } + if len(clients) == 0 { + return clients, fmt.Errorf("No hosts defined in the Elasticsearch output") + } + return clients, nil +} + func (out *elasticsearchOutput) init( cfg *common.Config, topologyExpire int, diff --git a/libbeat/outputs/elasticsearch/testing.go b/libbeat/outputs/elasticsearch/testing.go new file mode 100644 index 00000000000..ecdab119174 --- /dev/null +++ b/libbeat/outputs/elasticsearch/testing.go @@ -0,0 +1,58 @@ +package elasticsearch + +import ( + "os" + "time" + + "github.com/elastic/beats/libbeat/outputs/outil" +) + +const ElasticsearchDefaultHost = "localhost" +const ElasticsearchDefaultPort = "9200" + +func GetEsPort() string { + port := os.Getenv("ES_PORT") + + if len(port) == 0 { + port = ElasticsearchDefaultPort + } + return port +} + +// Returns +func GetEsHost() string { + + host := os.Getenv("ES_HOST") + + if len(host) == 0 { + host = ElasticsearchDefaultHost + } + + return host +} + +func GetTestingElasticsearch() *Client { + var address = "http://" + GetEsHost() + ":" + GetEsPort() + username := os.Getenv("ES_USER") + pass := os.Getenv("ES_PASS") + client := newTestClientAuth(address, username, pass) + + // Load version number + client.Connect(3 * time.Second) + return client +} + +func newTestClientAuth(url, user, pass string) *Client { + client, err := NewClient(ClientSettings{ + URL: url, + Index: outil.MakeSelector(), + Username: user, + Password: pass, + Timeout: 60 * time.Second, + CompressionLevel: 3, + }, nil) + if err != nil { + panic(err) + } + return client +}