Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cursor_seek_fallback option #9234

Merged
merged 2 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha1...master[Check the HEAD d

*Journalbeat*

- Add cursor_seek_fallback option. {pull}9234[9234]

*Metricbeat*

- Add setting to disable docker cpu metrics per core. {pull}9194[9194]
Expand Down
2 changes: 2 additions & 0 deletions journalbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ journalbeat.inputs:

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
# Fallback position if no cursor data is available.
#cursor_seek_fallback: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
Expand Down
9 changes: 6 additions & 3 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek config.SeekMode `config:"seek"`
// CursorSeekFallback sets where to seek if registry file is not available.
CursorSeekFallback config.SeekMode `config:"cursor_seek_fallback"`
// Matches store the key value pairs to match entries.
Matches []string `config:"include_matches"`

Expand All @@ -48,8 +50,9 @@ type Config struct {
var (
// DefaultConfig is the defaults for an inputs
DefaultConfig = Config{
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
CursorSeekFallback: config.SeekHead,
}
)
22 changes: 12 additions & 10 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ func New(
var readers []*reader.Reader
if len(config.Paths) == 0 {
cfg := reader.Config{
Path: reader.LocalSystemJournalID, // used to identify the state in the registry
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Seek: config.Seek,
Matches: config.Matches,
Path: reader.LocalSystemJournalID, // used to identify the state in the registry
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Seek: config.Seek,
CursorSeekFallback: config.CursorSeekFallback,
Matches: config.Matches,
}

state := states[reader.LocalSystemJournalID]
Expand All @@ -84,11 +85,12 @@ func New(

for _, p := range config.Paths {
cfg := reader.Config{
Path: p,
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Seek: config.Seek,
Matches: config.Matches,
Path: p,
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Seek: config.Seek,
CursorSeekFallback: config.CursorSeekFallback,
Matches: config.Matches,
}
state := states[p]
r, err := reader.New(cfg, done, state, logger)
Expand Down
2 changes: 2 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ journalbeat.inputs:

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
# Fallback position if no cursor data is available.
#cursor_seek_fallback: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
Expand Down
2 changes: 2 additions & 0 deletions journalbeat/journalbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ journalbeat.inputs:

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
# Fallback position if no cursor data is available.
#cursor_seek_fallback: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
Expand Down
2 changes: 2 additions & 0 deletions journalbeat/reader/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
// Seek specifies the seeking stategy.
// Possible values: head, tail, cursor.
Seek config.SeekMode
// CursorSeekFallback sets where to seek if registry file is not available.
CursorSeekFallback config.SeekMode
// MaxBackoff is the limit of the backoff time.
MaxBackoff time.Duration
// Backoff is the current interval to wait before
Expand Down
12 changes: 10 additions & 2 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,16 @@ func (r *Reader) seek(cursor string) {
switch r.config.Seek {
case config.SeekCursor:
if cursor == "" {
r.journal.SeekHead()
r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning")
switch r.config.CursorSeekFallback {
case config.SeekHead:
r.journal.SeekHead()
r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning")
case config.SeekTail:
r.journal.SeekTail()
r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the end")
default:
r.logger.Error("Invalid option for cursor_seek_fallback")
}
return
}
r.journal.SeekCursor(cursor)
Expand Down
5 changes: 4 additions & 1 deletion journalbeat/tests/system/config/journalbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
journalbeat.inputs:
- paths: [{{ journal_path }}]
seek: {{ seek_method }}
matches: [{{ matches }}]
{% if cursor_seek_fallback %}
cursor_seek_fallback: {{ cursor_seek_fallback }}
{% endif %}
include_matches: [{{ matches }}]

journalbeat.registry: {{ registry_file }}

Expand Down
33 changes: 31 additions & 2 deletions journalbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ def test_start_with_local_journal(self):
)
journalbeat_proc = self.start_beat()

self.wait_until(lambda: self.log_contains(
"journalbeat is running"), max_timeout=10)
self.wait_until(lambda: self.log_contains("journalbeat is running"))

exit_code = journalbeat_proc.kill_and_wait()
assert exit_code == 0
Expand All @@ -33,6 +32,7 @@ def test_start_with_journal_directory(self):

self.render_config_template(
journal_path=self.beat_path + "/tests/system/input/",
seek_method="tail",
path=os.path.abspath(self.working_dir) + "/log/*"
)
journalbeat_proc = self.start_beat()
Expand Down Expand Up @@ -78,6 +78,35 @@ def test_start_with_selected_journal_file(self):
exit_code = journalbeat_proc.kill_and_wait()
assert exit_code == 0

@unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux")
def test_start_with_selected_journal_file_with_cursor_fallback(self):
"""
Journalbeat is able to open a journal file and start to read it from the position configured by seek and cursor_seek_fallback.
"""

self.render_config_template(
journal_path=self.beat_path + "/tests/system/input/test.journal",
seek_method="cursor",
cursor_seek_fallback="tail",
path=os.path.abspath(self.working_dir) + "/log/*"
)
journalbeat_proc = self.start_beat()

required_log_snippets = [
# journalbeat can be started
"journalbeat is running",
# journalbeat can seek to the position defined in cursor_seek_fallback.
"Seeking method set to cursor, but no state is saved for reader. Starting to read from the end",
# message can be read from test journal
"\"message\": \"thinkpad_acpi: please report the conditions when this event happened to ibm-acpi-devel@lists.sourceforge.net\"",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where have you got the test message from :)

]
for snippet in required_log_snippets:
self.wait_until(lambda: self.log_contains(snippet),
name="Line in '{}' Journalbeat log".format(snippet))

exit_code = journalbeat_proc.kill_and_wait()
assert exit_code == 0

@unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux")
def test_read_events_with_existing_registry(self):
"""
Expand Down