Skip to content

Commit

Permalink
Move filebeat to new publisher pipeline
Browse files Browse the repository at this point in the history
- remove filebeat/spooler and filebeat/publisher package -> all spooling and
  reporting published events is moved to the publisher pipeline.
  Difference between spooler/publisher to new pipeline is:
  The new publisher pipeline operates fully asynchronous
- have filebeat register an eventACKer with the publisher pipeline.
  The eventACKer will forward state updates of ACKed events to the registrar
- filebeat uses beat.Event for events
- update util.Data to use beat.Event:
  - store state in event.Private field for consumption by registry
- changes to filebeat/channels package:
  - introduce OutletFactory: connect to publisher pipeline, applying common
    prospector settings
  - remove Outleter.SetSignal and Outleter.OnEventSignal
  - add Outleter.Close
  - introduce SubOutlet (separate closing):
    - when a suboutlet is closed, the original outlet is still active
    - if underlying outlet is closed, the suboutlet becomes closed as well (can
      not forward anymore)
  - introduce CloseOnSignal: close Outlet once a 'done' channel is closed
  - most functionality from harvester.Forwarder is moved into the
    outlet/publisher pipeline client
- fix: ensure client events listener properly installed

Note:
Outlet shutdown with prospectors and harvesters is somewhat delicate. There are
3 shutdown signals to take into account:
- filebeat.done
- harvester.done
- outDone (signal used to unblock prospectors from registrar on shutdown).

An outlet is shared between all harvesters of a prospector and the prospector
itself. If outDone is closed, all outlets will be closed, unblocking
potentially waiting harvesters and prosepectors on filebeat shutdown.
The prospector uses a sub-outlet for sending state updates (being closed on
filebeat.done). The harvesters sub-outlet is closed when harveser.done is
closed.
The signals are only required to unblock an harvester/prospector on exit. On
normal shutdown, the outlets are closed after all workers have been finished.
  • Loading branch information
urso committed Jul 11, 2017
1 parent 37f4abb commit 8e66d95
Show file tree
Hide file tree
Showing 43 changed files with 643 additions and 1,089 deletions.
41 changes: 41 additions & 0 deletions filebeat/beater/acker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package beater

import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/publisher/beat"
)

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar.
type eventACKer struct {
out successLogger
}

type successLogger interface {
Published(events []*util.Data) bool
}

func newEventACKer(out successLogger) *eventACKer {
return &eventACKer{out: out}
}

func (a *eventACKer) ackEvents(events []beat.Event) {
data := make([]*util.Data, 0, len(events))
for _, event := range events {
p := event.Private
if p == nil {
continue
}

datum, ok := p.(*util.Data)
if !ok || !datum.HasState() {
continue
}

data = append(data, datum)
}

if len(data) > 0 {
a.out.Published(data)
}
}
72 changes: 0 additions & 72 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,11 @@ package beater

import (
"sync"
"sync/atomic"

"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
"github.com/elastic/beats/filebeat/util"
)

type spoolerOutlet struct {
wg *sync.WaitGroup
done <-chan struct{}
spooler *spooler.Spooler

isOpen int32 // atomic indicator
}

type publisherChannel struct {
done chan struct{}
ch chan []*util.Data
}

type registrarLogger struct {
done chan struct{}
ch chan<- []*util.Data
Expand All @@ -31,63 +16,6 @@ type finishedLogger struct {
wg *sync.WaitGroup
}

func newSpoolerOutlet(
done <-chan struct{},
s *spooler.Spooler,
wg *sync.WaitGroup,
) *spoolerOutlet {
return &spoolerOutlet{
done: done,
spooler: s,
wg: wg,
isOpen: 1,
}
}

func (o *spoolerOutlet) OnEvent(data *util.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
}

if o.wg != nil {
o.wg.Add(1)
}

select {
case <-o.done:
if o.wg != nil {
o.wg.Done()
}
atomic.StoreInt32(&o.isOpen, 0)
return false
case o.spooler.Channel <- data:
return true
}
}

func newPublisherChannel() *publisherChannel {
return &publisherChannel{
done: make(chan struct{}),
ch: make(chan []*util.Data, 1),
}
}

func (c *publisherChannel) Close() { close(c.done) }
func (c *publisherChannel) Send(events []*util.Data) bool {
select {
case <-c.done:
// set ch to nil, so no more events will be send after channel close signal
// has been processed the first time.
// Note: nil channels will block, so only done channel will be actively
// report 'closed'.
c.ch = nil
return false
case c.ch <- events:
return true
}
}

func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
return &registrarLogger{
done: make(chan struct{}),
Expand Down
45 changes: 16 additions & 29 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
pub "github.com/elastic/beats/libbeat/publisher/beat"

"github.com/elastic/beats/filebeat/channel"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
"github.com/elastic/beats/filebeat/fileset"
"github.com/elastic/beats/filebeat/publisher"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"

// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata"
Expand Down Expand Up @@ -165,21 +164,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)

// Channel from spooler to harvester
publisherChan := newPublisherChannel()

// Publishes event to output
publisher := publisher.New(config.PublishAsync, publisherChan.ch, registrarChannel, b.Publisher)

// Init and Start spooler: Harvesters dump events into the spooler.
spooler, err := spooler.New(config, publisherChan)
err = b.Publisher.SetACKHandler(pub.PipelineACKHandler{
ACKEvents: newEventACKer(registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Could not init spooler: %v", err)
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
return err
}

outlet := channel.NewOutlet(fb.done, spooler.Channel, wgEvents)
crawler, err := crawler.New(outlet, config.Prospectors, b.Info.Version, fb.done, *once)
outDone := make(chan struct{}) // outDone closes down all active pipeline connections
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
config.Prospectors,
b.Info.Version,
fb.done,
*once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand All @@ -194,32 +193,20 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
if err != nil {
return fmt.Errorf("Could not start registrar: %v", err)
}

// Stopping registrar will write last state
defer registrar.Stop()

// Start publisher
publisher.Start()
// Stopping publisher (might potentially drop items)
defer func() {
// Closes first the registrar logger to make sure not more events arrive at the registrar
// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
registrarChannel.Close()
publisher.Stop()
close(outDone) // finally close all active connections to publisher pipeline
}()

// Starting spooler
spooler.Start()

// Stopping spooler will flush items
defer func() {
// Wait for all events to be processed or timeout
waitEvents.Wait()

// Closes publisher so no further events can be sent
publisherChan.Close()
// Stopping spooler
spooler.Stop()
}()
// Wait for all events to be processed or timeout
defer waitEvents.Wait()

// Create a ES connection factory for dynamic modules pipeline loading
var pipelineLoaderFactory fileset.PipelineLoaderFactory
Expand Down
128 changes: 128 additions & 0 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package channel

import (
"sync"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
)

type OutletFactory struct {
done <-chan struct{}
pipeline publisher.Publisher

eventer beat.ClientEventer
wgEvents *sync.WaitGroup
}

// clientEventer adjusts wgEvents if events are dropped during shutdown.
type clientEventer struct {
wgEvents *sync.WaitGroup
}

// prospectorOutletConfig defines common prospector settings
// for the publisher pipline.
type prospectorOutletConfig struct {
// event processing
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`

// implicit event fields
Type string `config:"type"` // prospector.type

// hidden filebeat modules settings
Module string `config:"_module_name"` // hidden setting
Fileset string `config:"_fileset_name"` // hidden setting

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name

}

// NewOutletFactory creates a new outlet factory for
// connecting a prospector to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
pipeline publisher.Publisher,
wgEvents *sync.WaitGroup,
) *OutletFactory {
o := &OutletFactory{
done: done,
pipeline: pipeline,
wgEvents: wgEvents,
}

if wgEvents != nil {
o.eventer = &clientEventer{wgEvents}
}

return o
}

// Create builds a new Outleter, while applying common prospector settings.
// Prospectors and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(cfg *common.Config) (Outleter, error) {
config := prospectorOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}

setMeta := func(to common.MapStr, key, value string) {
if value != "" {
to[key] = value
}
}

meta := common.MapStr{}
setMeta(meta, "pipeline", config.Pipeline)

fields := common.MapStr{}
setMeta(fields, "module", config.Module)
setMeta(fields, "name", config.Fileset)
if len(fields) > 0 {
fields = common.MapStr{
"fileset": fields,
}
}
if config.Type != "" {
fields["prospector"] = common.MapStr{
"type": config.Type,
}
}

client, err := f.pipeline.ConnectX(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
Meta: meta,
Fields: fields,
Processor: processors,
Events: f.eventer,
})
if err != nil {
return nil, err
}

outlet := newOutlet(client, f.wgEvents)
if f.done != nil {
return CloseOnSignal(outlet, f.done), nil
}
return outlet, nil
}

func (*clientEventer) Closing() {}
func (*clientEventer) Closed() {}
func (*clientEventer) Published() {}

func (c *clientEventer) FilteredOut(_ beat.Event) {}
func (c *clientEventer) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}
12 changes: 8 additions & 4 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package channel

import "github.com/elastic/beats/filebeat/util"
import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
)

// OutletFactory is used to create a new Outlet instance
type OutleterFactory func(*common.Config) (Outleter, error)

// Outleter is the outlet for a prospector
type Outleter interface {
SetSignal(signal <-chan struct{})
OnEventSignal(data *util.Data) bool
Close() error
OnEvent(data *util.Data) bool
Copy() Outleter
}
Loading

0 comments on commit 8e66d95

Please sign in to comment.