Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #8973 to 6.x: Minor Journalbeat fixes and additions #9007

Merged
merged 1 commit into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions journalbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s
#max_backoff: 20s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -42,15 +42,3 @@ journalbeat.inputs:
# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []
169 changes: 169 additions & 0 deletions journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
{
"objects": [
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "process.name:systemd"
},
"version": true
}
},
"sort": [
"@timestamp",
"desc"
],
"title": "[Journalbeat] Systemd messages",
"version": 1
},
"id": "aa003e90-e2b9-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:19:28.377Z",
"version": 1
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"journald.kernel.subsystem",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "syslog.facility:0 AND syslog.priority:\u003c4"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] Kernel errors",
"version": 1
},
"id": "5db75310-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:24:29.889Z",
"version": 1
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"process.name",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "syslog.facility:4"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] Login authorization",
"version": 1
},
"id": "82408120-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:26:05.348Z",
"version": 2
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"journald.kernel.subsystem",
"journald.kernel.device_node_path",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "journald.kernel.subsystem:usb OR journald.kernel.subsystem:hid"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] USB and HID messages",
"version": 1
},
"id": "f0232670-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:28:35.543Z",
"version": 1
},
{
"attributes": {
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"query": {
"language": "lucene",
"query": ""
}
}
},
"optionsJSON": {
"darkTheme": false,
"hidePanelTitles": false,
"useMargins": true
},
"panelsJSON": null,
"timeRestore": false,
"title": "[Journalbeat] Overview",
"version": 1
},
"id": "f2de4440-e2b9-11e8-9f52-734e93de180d",
"type": "dashboard",
"updated_at": "2018-11-07T18:30:18.083Z",
"version": 2
}
],
"version": "7.0.0-alpha1-SNAPSHOT"
}
3 changes: 3 additions & 0 deletions journalbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
)

// Checkpoint persists event log state information to disk.
Expand Down Expand Up @@ -87,6 +88,8 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

c.file = paths.Resolve(paths.Data, c.file)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could have been set directly on c initialization.


// Minimum batch size.
if c.maxUpdates < 1 {
c.maxUpdates = 1
Expand Down
53 changes: 42 additions & 11 deletions journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,56 @@
package config

import (
"time"
"fmt"

"github.com/elastic/beats/libbeat/common"
)

// SeekMode is specifies how a journal is read
type SeekMode uint8

// Config stores the configuration of Journalbeat
type Config struct {
Inputs []*common.Config `config:"inputs"`
RegistryFile string `config:"registry_file"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Seek string `config:"seek"`
Matches []string `config:"include_matches"`
}

// DefaultConfig are the defaults of a Journalbeat instance
var DefaultConfig = Config{
RegistryFile: "registry",
Backoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
const (
// SeekInvalid is an invalid value for seek
SeekInvalid SeekMode = iota
// SeekHead option seeks to the head of a journal
SeekHead
// SeekTail option seeks to the tail of a journal
SeekTail
// SeekCursor option seeks to the position specified in the cursor
SeekCursor

seekHeadStr = "head"
seekTailStr = "tail"
seekCursorStr = "cursor"
)

var (
// DefaultConfig are the defaults of a Journalbeat instance
DefaultConfig = Config{
RegistryFile: "registry",
}

seekModes = map[string]SeekMode{
seekHeadStr: SeekHead,
seekTailStr: SeekTail,
seekCursorStr: SeekCursor,
}
)

// Unpack validates and unpack "seek" config option
func (m *SeekMode) Unpack(value string) error {
mode, ok := seekModes[value]
if !ok {
return fmt.Errorf("invalid seek mode '%s'", value)
}

*m = mode

return nil
}
33 changes: 7 additions & 26 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package input

import (
"fmt"
"time"

"github.com/elastic/beats/journalbeat/config"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)
Expand All @@ -29,15 +29,13 @@ import (
type Config struct {
// Paths stores the paths to the journal files to be read.
Paths []string `config:"paths"`
// MaxBackoff is the limit of the backoff time.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// Backoff is the current interval to wait before
// attemting to read again from the journal.
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
// BackoffFactor is the multiplier of Backoff.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// MaxBackoff is the limit of the backoff time.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek string `config:"seek"`
Seek config.SeekMode `config:"seek"`
// Matches store the key value pairs to match entries.
Matches []string `config:"include_matches"`

Expand All @@ -50,25 +48,8 @@ type Config struct {
var (
// DefaultConfig is the defaults for an inputs
DefaultConfig = Config{
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
}
)

// Validate check the configuration of the input.
func (c *Config) Validate() error {
correctSeek := false
for _, s := range []string{"cursor", "head", "tail"} {
if c.Seek == s {
correctSeek = true
}
}

if !correctSeek {
return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek)
}

return nil
}
12 changes: 7 additions & 5 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Input struct {
done chan struct{}
config Config
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
id uuid.UUID
logger *logp.Logger
Expand Down Expand Up @@ -120,7 +121,8 @@ func New(
// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
client, err := i.pipeline.ConnectWith(beat.ClientConfig{
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Expand All @@ -133,13 +135,12 @@ func (i *Input) Run() {
i.logger.Error("Error connecting to output: %v", err)
return
}
defer client.Close()

i.publishAll(client)
i.publishAll()
}

// publishAll reads events from all readers and publishes them.
func (i *Input) publishAll(client beat.Client) {
func (i *Input) publishAll() {
out := make(chan *beat.Event)
defer close(out)

Expand Down Expand Up @@ -179,13 +180,14 @@ func (i *Input) publishAll(client beat.Client) {
case <-i.done:
return
case e := <-out:
client.Publish(*e)
i.client.Publish(*e)
}
}
}

// Stop stops all readers of the input.
func (i *Input) Stop() {
i.client.Close()
for _, r := range i.readers {
r.Close()
}
Expand Down
Loading