Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move filebeat to new publisher pipeline #4644

Merged
merged 2 commits into from
Jul 11, 2017
Merged

Conversation

urso
Copy link

@urso urso commented Jul 10, 2017

  • remove filebeat/spooler and filebeat/publisher package -> all spooling and
    reporting published events is moved to the publisher pipeline.
    Difference between spooler/publisher to new pipeline is:
    The new publisher pipeline operates fully asynchronous
  • have filebeat register an eventACKer with the publisher pipeline.
    The eventACKer will forward state updates of ACKed events to the registrar
  • filebeat uses beat.Event for events
  • update util.Data to use beat.Event:
    • store state in event.Private field for consumption by registry
  • changes to filebeat/channels package:
    • introduce OutletFactory: connect to publisher pipeline, applying common
      prospector settings
    • remove Outleter.SetSignal and Outleter.OnEventSignal
    • add Outleter.Close
    • introduce SubOutlet (separate closing):
      • when a suboutlet is closed, the original outlet is still active
      • if underlying outlet is closed, the suboutlet becomes closed as well (can
        not forward anymore)
    • introduce CloseOnSignal: close Outlet once a 'done' channel is closed
    • most functionality from harvester.Forwarder is moved into the
      outlet/publisher pipeline client

Note:
Outlet shutdown with prospectors and harvesters is somewhat delicate. There are
3 shutdown signals to take into account:

  • filebeat.done
  • harvester.done
  • outDone (signal used to unblock prospectors from registrar on shutdown).

An outlet is shared between all harvesters of a prospector and the prospector
itself. If outDone is closed, all outlets will be closed, unblocking
potentially waiting harvesters and prosepectors on filebeat shutdown.
The prospector uses a sub-outlet for sending state updates (being closed on
filebeat.done). The harvesters sub-outlet is closed when harveser.done is
closed.
The signals are only required to unblock an harvester/prospector on exit. On
normal shutdown, the outlets are closed after all workers have been finished.

@urso urso added Filebeat Filebeat review labels Jul 10, 2017
@urso urso mentioned this pull request Jul 10, 2017
22 tasks
@urso urso force-pushed the pipeline/filebeat branch 2 times, most recently from cbb9b68 to 03c57bf Compare July 11, 2017 04:30
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
Type string `config:"type"`
}

// NewForwarder creates a new forwarder instances and initialises processors if configured
func NewForwarder(cfg *common.Config, outlet Outlet) (*Forwarder, error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the forwarder is going to be removed in future iterations.

Copy link
Member

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I feels strange to get rid of the spooler but it's a great step forward to have this now in the publisher, means other beats can make use of it too. Small step for beats, big step for filebeat :-)

During review I left a few comments/questions that popped up in my head. Most of them got answered when reading more code but it would be good if you could have a quick look at it and just confirm that this is the case.

// Stopping publisher (might potentially drop items)
defer func() {
// Closes first the registrar logger to make sure not more events arrive at the registrar
// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
registrarChannel.Close()
publisher.Stop()
defer close(outDone) // finally close all active connections to publisher pipeline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a defer inside the defer?

Copy link
Author

@urso urso Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ups, copy'n paste. Interestingly the effect will be the same :)

Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
Type string `config:"type"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand why type is still here and all the others were moved.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the forwarder is subject to be removed in future iterations. It's for the PutValue when publishing. In the meantime I added ClientConfig.Fields to register fields to be added to every event on publish.
Basically the Forwarder will be remove in favor of the channel.Outleter, which will be removed in the future, in favor if beat.Client (once we get a more sane registry handling).


// run the filters before sending to spooler
data.Event = f.Processors.RunBC(data.Event)
data.Event.PutValue("prospector.type", f.Config.Type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add the type in the outlet? Probably we need the type in other places too?

}

ok := f.Outlet.OnEventSignal(data)
ok := f.Outlet.OnEvent(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own note: I must check that there are still 2 different methods here, one blocking and one non blocking to make sure harvester is only closed when data acked (or in queue ...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the harvester get's a publishState, which uses the stateOutlet. The stateOutlet (owned by the prospector) and the Outlet of the harvesters are closed on different signals, each... so many hoops...

var err error

outlet = channel.CloseOnSignal(outlet, h.done)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on line 113 is now in the wrong place. I put this one as early as possible because of our still unsovled race condition.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing.

@@ -43,27 +42,31 @@ def test_shutdown_wait_ok(self):

# Wait until first flush
self.wait_until(
lambda: self.log_contains_count("Flushing spooler") > 1,
lambda: self.log_contains_count("Publish event") > 200,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with setting here 200 instead of one is that the likelyhood of all log lines being already read increases. I want to try to shut down as early as possible. What is the reason you increased this one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before it was checking for 'Flushing spooler' message. which can be any number of events (up to spooler size). As spooler and any kind of flushing is gone, I have had to use a replacement on number of events being published. The test file contains like 50k events. 200 should be ok, by we can decrease it.


@unittest.skip("Skipping unreliable test")
# we allow for a potential race in the harvester shutdown here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see outlet.OnEvent in channel/outlet.go.

	// Note: race condition on shutdown:
	//  The underlying beat.Client is asynchronous. Without proper ACK
	//  handler we can not tell if the event made it 'through' or the client
	//  close has been completed before sending. In either case,
	//  we report 'false' here, indicating the event eventually being dropped.
	//  Returning false here, prevents the harvester from updating the state
	//  to the most recently published events. Therefore, on shutdown the harvester
	//  might report an old/outdated state update to the registry, overwriting the
	//  most recently published offset in the registry on shutdown.

result[k] = innerMap.Clone()
} else {
result[k] = v
if innerMap, ok := tryToMapStr(v); ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of surprised to see a mapstr change in this PR. Why was that needed?

Copy link
Author

@urso urso Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ups, kind of leaked into this PR :)
I found the old processors ordering potentially overwriting a globally shared MapStr and was forced to introduce some Clone() on shared MapStr instances. So the processors adding/removing fields always operate on copies, but not the original MapStr. The amount of fmt.Errorf was so expensive it did totally kill throughput.

@@ -219,9 +219,9 @@ func (b *Broker) eventLoop() {
events = b.events
}

b.logger.Debug("active events: ", activeEvents)
// b.logger.Debug("active events: ", activeEvents)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on purpose commented out? also below

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, loads of debugs commented out in the fast path. I did still keep them, so I can re-enabled them If I really need them.

@@ -264,7 +264,7 @@ func (b *eventBuffer) Len() int {
}

func (b *eventBuffer) Set(idx int, event publisher.Event, st clientState) {
b.logger.Debugf("insert event: idx=%v, seq=%v\n", idx, st.seq)
// b.logger.Debugf("insert event: idx=%v, seq=%v\n", idx, st.seq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment?

urso added 2 commits July 11, 2017 13:37
- remove filebeat/spooler and filebeat/publisher package -> all spooling and
  reporting published events is moved to the publisher pipeline.
  Difference between spooler/publisher to new pipeline is:
  The new publisher pipeline operates fully asynchronous
- have filebeat register an eventACKer with the publisher pipeline.
  The eventACKer will forward state updates of ACKed events to the registrar
- filebeat uses beat.Event for events
- update util.Data to use beat.Event:
  - store state in event.Private field for consumption by registry
- changes to filebeat/channels package:
  - introduce OutletFactory: connect to publisher pipeline, applying common
    prospector settings
  - remove Outleter.SetSignal and Outleter.OnEventSignal
  - add Outleter.Close
  - introduce SubOutlet (separate closing):
    - when a suboutlet is closed, the original outlet is still active
    - if underlying outlet is closed, the suboutlet becomes closed as well (can
      not forward anymore)
  - introduce CloseOnSignal: close Outlet once a 'done' channel is closed
  - most functionality from harvester.Forwarder is moved into the
    outlet/publisher pipeline client
- fix: ensure client events listener properly installed

Note:
Outlet shutdown with prospectors and harvesters is somewhat delicate. There are
3 shutdown signals to take into account:
- filebeat.done
- harvester.done
- outDone (signal used to unblock prospectors from registrar on shutdown).

An outlet is shared between all harvesters of a prospector and the prospector
itself. If outDone is closed, all outlets will be closed, unblocking
potentially waiting harvesters and prosepectors on filebeat shutdown.
The prospector uses a sub-outlet for sending state updates (being closed on
filebeat.done). The harvesters sub-outlet is closed when harveser.done is
closed.
The signals are only required to unblock an harvester/prospector on exit. On
normal shutdown, the outlets are closed after all workers have been finished.
@ruflin ruflin merged commit b2c7c18 into elastic:master Jul 11, 2017
@urso urso deleted the pipeline/filebeat branch February 19, 2019 18:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants