Skip to content

Commit

Permalink
Load Filebeat modules pipelines on -setup (#3394)
Browse files Browse the repository at this point in the history
* Load Filebeat modules pipelines on -setup

This adds the `-setup` CLI flag, which, for now, makes Filebeat load the
pipelines at startup. In case Elasticsearch is not available when
Filebeat is started with `-setup`, Filebeat will exit with an error.

This also exposes an Elasticsearch client from the output.

* Use an interface instead of the ES client
  • Loading branch information
tsg authored and andrewkroh committed Jan 20, 2017
1 parent 77cec5a commit c6c5e5c
Show file tree
Hide file tree
Showing 15 changed files with 344 additions and 249 deletions.
28 changes: 27 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
Expand All @@ -18,7 +19,10 @@ import (
"github.com/elastic/beats/filebeat/spooler"
)

var once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
var (
once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
setup = flag.Bool("setup", false, "Run the setup phase for the modules")
)

// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
Expand Down Expand Up @@ -67,11 +71,33 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return fb, nil
}

// Setup is called on user request (the -setup flag) to do the initial Beat setup.
func (fb *Filebeat) Setup(b *beat.Beat) error {
esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
return fmt.Errorf("Setup requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return fmt.Errorf("Error creating ES client: %v", err)
}
defer esClient.Close()

return fb.moduleRegistry.Setup(esClient)
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config

if *setup {
err = fb.Setup(b)
if err != nil {
return err
}
}

waitFinished := newSignalWait()
waitEvents := newSignalWait()

Expand Down
171 changes: 0 additions & 171 deletions filebeat/filebeat.py

This file was deleted.

27 changes: 26 additions & 1 deletion filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package fileset

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -242,7 +243,31 @@ func (fs *Fileset) getPipelineID() (string, error) {
return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

return fmt.Sprintf("%s-%s-%s", fs.mcfg.Module, fs.name, removeExt(filepath.Base(path))), nil
return formatPipelineID(fs.mcfg.Module, fs.name, path), nil
}

func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

f, err := os.Open(filepath.Join(fs.modulePath, fs.name, path))
if err != nil {
return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

dec := json.NewDecoder(f)
err = dec.Decode(&content)
if err != nil {
return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
return formatPipelineID(fs.mcfg.Module, fs.name, path), content, nil
}

// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func formatPipelineID(module, fileset, path string) string {
return fmt.Sprintf("%s-%s-%s", module, fileset, removeExt(filepath.Base(path)))
}

// removeExt returns the file name without the extension. If no dot is found,
Expand Down
17 changes: 15 additions & 2 deletions filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !integration

package fileset

import (
Expand Down Expand Up @@ -176,7 +178,18 @@ func TestGetProspectorConfigNginxOverrides(t *testing.T) {
assert.True(t, cfg.HasField("paths"))
assert.True(t, cfg.HasField("exclude_files"))
assert.True(t, cfg.HasField("close_eof"))
pipeline_id := fs.vars["beat"].(map[string]interface{})["pipeline_id"]
assert.Equal(t, "nginx-access-with_plugins", pipeline_id)
pipelineID := fs.vars["beat"].(map[string]interface{})["pipeline_id"]
assert.Equal(t, "nginx-access-with_plugins", pipelineID)

}

func TestGetPipelineNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read())

pipelineID, content, err := fs.GetPipeline()
assert.NoError(t, err)
assert.Equal(t, "nginx-access-with_plugins", pipelineID)
assert.Contains(t, content, "description")
assert.Contains(t, content, "processors")
}
33 changes: 33 additions & 0 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,36 @@ func (reg *ModuleRegistry) GetProspectorConfigs() ([]*common.Config, error) {
}
return result, nil
}

// PipelineLoader is a subset of the Elasticsearch client API capable of loading
// the pipelines.
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) error
}

// Setup is called on -setup and loads the pipelines for each configured fileset.
func (reg *ModuleRegistry) Setup(esClient PipelineLoader) error {
for module, filesets := range reg.registry {
for name, fileset := range filesets {
pipelineID, content, err := fileset.GetPipeline()
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
err = loadPipeline(esClient, pipelineID, content)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
}
}
return nil
}

func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}) error {
path := "/_ingest/pipeline/" + pipelineID
err := esClient.LoadJSON(path, content)
if err != nil {
return fmt.Errorf("couldn't load template: %v", err)
}
logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID)
return nil
}
58 changes: 58 additions & 0 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// +build integration

package fileset

import (
"path/filepath"
"testing"

"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/stretchr/testify/assert"
)

func TestLoadPipeline(t *testing.T) {
client := elasticsearch.GetTestingElasticsearch()
client.Request("DELETE", "/_ingest/pipeline/my-pipeline-id", "", nil, nil)

content := map[string]interface{}{
"description": "describe pipeline",
"processors": []map[string]interface{}{
{
"set": map[string]interface{}{
"field": "foo",
"value": "bar",
},
},
},
}

err := loadPipeline(client, "my-pipeline-id", content)
assert.NoError(t, err)

status, _, _ := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil)
assert.Equal(t, 200, status)
}

func TestSetupNginx(t *testing.T) {
client := elasticsearch.GetTestingElasticsearch()
client.Request("DELETE", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil)
client.Request("DELETE", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil)

modulesPath, err := filepath.Abs("../module")
assert.NoError(t, err)

configs := []ModuleConfig{
ModuleConfig{Module: "nginx"},
}

reg, err := newModuleRegistry(modulesPath, configs, nil)
assert.NoError(t, err)

err = reg.Setup(client)
assert.NoError(t, err)

status, _, _ := client.Request("GET", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil)
assert.Equal(t, 200, status)
status, _, _ = client.Request("GET", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil)
assert.Equal(t, 200, status)
}
2 changes: 2 additions & 0 deletions filebeat/fileset/modules_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !integration

package fileset

import (
Expand Down
Loading

0 comments on commit c6c5e5c

Please sign in to comment.