Skip to content

Commit

Permalink
Cherry-pick #10801 to 7.0: Beats event processing and default fields (#…
Browse files Browse the repository at this point in the history
…11155)

Cherry-pick of PR #10801 to 7.0 branch. Original message: 

This changes moves the generation of the event processing into it's
distinct package, such that the actual publisher pipeline will not
define any processors anymore. A new instance of a publisher pipeline
must not add fields on it's own.

With this change we convert the event processing pipline into the 'Supporter'
pattern, which is already used for Index Management.
As different beats ask for slightly different behavior in the event
processing (e.g. normalize, default builtins and so on), the
`processing.Supporter` can be used for customizations.

Also fixes new fields accidentily being added to the monitoring outputs, as it separates the pipeline and processors.

Simplifies tests, but also adds a few test cases for dynamic fields and other settings.
  • Loading branch information
Steffen Siering committed Mar 8, 2019
1 parent 0d8df09 commit 4756ec1
Show file tree
Hide file tree
Showing 22 changed files with 1,052 additions and 903 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ The list below covers the major changes between 7.0.0-beta1 and master only.
be used to create an index selector. {pull}10347[10347]
- Remove support for loading dashboards to Elasticsearch 5. {pull}10451[10451]
- Remove support for deprecated `GenRootCmd` methods. {pull}10721[10721]
- Remove SkipNormalization, SkipAgentMetadata, SkipAddHostName. {pull}10801[10801] {pull}10769[10769]

==== Bugfixes

- Align default index between elasticsearch and logstash and kafka output. {pull}10841[10841]
- Fix duplication check for `append_fields` option. {pull}10959[10959]

==== Added

- Introduce processing.Support to instance.Setting. This allows Beats to fully modify the event processing. {pull}10801[10801]
16 changes: 9 additions & 7 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *c
}

client, err := p.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
Events: f.eventer,
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
},
Events: f.eventer,
})
if err != nil {
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ func (t *configuredJob) Start() {
}

t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
Fields: fields,
Processing: beat.ProcessingConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
Fields: fields,
},
})
if err != nil {
logp.Err("could not start monitor: %v", err)
Expand Down
10 changes: 6 additions & 4 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ func New(
func (i *Input) Run() {
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
},
ACKCount: func(n int) {
i.logger.Infof("journalbeat successfully published %d events", n)
},
Expand Down
50 changes: 26 additions & 24 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,7 @@ type Client interface {
type ClientConfig struct {
PublishMode PublishMode

// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata common.EventMetadata

// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta common.MapStr

// Fields provides additional 'global' fields to be added to every event
Fields common.MapStr

// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *common.MapStrPointer

// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList
Processing ProcessingConfig

// WaitClose sets the maximum duration to wait on ACK, if client still has events
// active non-acknowledged events in the publisher pipeline.
Expand All @@ -72,14 +57,6 @@ type ClientConfig struct {
// Events configures callbacks for common client callbacks
Events ClientEventer

// By default events are normalized within processor pipeline,
// if the normalization step should be skipped set this to true.
SkipNormalization bool

// By default events are decorated with agent metadata.
// To skip adding that metadata set this to true.
SkipAgentMetadata bool

// ACK handler strategies.
// Note: ack handlers are run in another go-routine owned by the publisher pipeline.
// They should not block for to long, to not block the internal buffers for
Expand All @@ -101,6 +78,31 @@ type ClientConfig struct {
ACKLastEvent func(interface{})
}

// ProcessingConfig provides additional event processing settings a client can
// pass to the publisher pipeline on Connect.
type ProcessingConfig struct {
// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata common.EventMetadata

// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta common.MapStr

// Fields provides additional 'global' fields to be added to every event
Fields common.MapStr

// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *common.MapStrPointer

// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
}

// ClientEventer provides access to internal client events.
type ClientEventer interface {
Closing() // Closing indicates the client is being shutdown next
Expand Down
14 changes: 14 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/version"
sysinfo "github.com/elastic/go-sysinfo"
Expand All @@ -78,6 +79,8 @@ type Beat struct {

keystore keystore.Keystore
index idxmgmt.Supporter

processing processing.Supporter
}

type beatConfig struct {
Expand Down Expand Up @@ -310,6 +313,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
Logger: logp.L().Named("publisher"),
},
b.Config.Pipeline,
b.processing,
b.makeOutputFactory(b.Config.Output),
)

Expand Down Expand Up @@ -594,6 +598,16 @@ func (b *Beat) configure(settings Settings) error {
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM)
}
b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig)
if err != nil {
return err
}

processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)

return err
}

Expand Down
3 changes: 3 additions & 0 deletions libbeat/cmd/instance/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/libbeat/monitoring/report"
"github.com/elastic/beats/libbeat/publisher/processing"
)

// Settings contains basic settings for any beat to pass into GenRootCmd
Expand All @@ -40,4 +41,6 @@ type Settings struct {
// load custom index manager. The config object will be the Beats root configuration.
IndexManagement idxmgmt.SupportFactory
ILM ilm.SupportFactory

Processing processing.SupportFactory
}
9 changes: 8 additions & 1 deletion libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/publisher/queue"
"github.com/elastic/beats/libbeat/publisher/queue/memqueue"
)
Expand Down Expand Up @@ -169,11 +170,16 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

processing, err := processing.MakeDefaultSupport(true)(beat, log, common.NewConfig())
if err != nil {
return nil, err
}

pipeline, err := pipeline.New(
beat,
pipeline.Monitors{
Metrics: monitoring,
Logger: logp.NewLogger(selector),
Logger: log,
},
queueFactory,
outputs.Group{
Expand All @@ -184,6 +190,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
Processors: processing,
})
if err != nil {
return nil, err
Expand Down
20 changes: 2 additions & 18 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -60,6 +60,7 @@ func Load(
beatInfo beat.Info,
monitors Monitors,
config Config,
processors processing.Supporter,
makeOutput func(outputs.Observer) (string, outputs.Group, error),
) (*Pipeline, error) {
log := monitors.Logger
Expand All @@ -71,28 +72,11 @@ func Load(
log.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}

name := beatInfo.Name
settings := Settings{
WaitClose: 0,
WaitCloseMode: NoWaitOnClose,
Disabled: publishDisabled,
Processors: processors,
Annotations: Annotations{
Event: config.EventMetadata,
Builtin: common.MapStr{
"host": common.MapStr{
"name": name,
},
"ecs": common.MapStr{
"version": "1.0.0-beta2",
},
},
},
}

queueBuilder, err := createQueueBuilder(config.Queue, monitors)
Expand Down
Loading

0 comments on commit 4756ec1

Please sign in to comment.