diff --git a/journalbeat/_meta/beat.yml b/journalbeat/_meta/beat.yml index c4e3c14db56..78c28ffe36d 100644 --- a/journalbeat/_meta/beat.yml +++ b/journalbeat/_meta/beat.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s + #max_backoff: 20s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -42,15 +42,3 @@ journalbeat.inputs: # Name of the registry file. If a relative path is used, it is considered relative to the # data path. #registry_file: registry - - # The number of seconds to wait before trying to read again from journals. - #backoff: 1s - # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s - - # Position to start reading from all journal. Possible values: head, tail, cursor - #seek: head - - # Exact matching for field values of events. - # Matching for nginx entries: "systemd.unit=nginx" - #matches: [] diff --git a/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json b/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json new file mode 100644 index 00000000000..fc771e9bebd --- /dev/null +++ b/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json @@ -0,0 +1,169 @@ +{ + "objects": [ + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "process.name:systemd" + }, + "version": true + } + }, + "sort": [ + "@timestamp", + "desc" + ], + "title": "[Journalbeat] Systemd messages", + "version": 1 + }, + "id": "aa003e90-e2b9-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:19:28.377Z", + "version": 1 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "journald.kernel.subsystem", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "syslog.facility:0 AND syslog.priority:\u003c4" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] Kernel errors", + "version": 1 + }, + "id": "5db75310-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:24:29.889Z", + "version": 1 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "process.name", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "syslog.facility:4" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] Login authorization", + "version": 1 + }, + "id": "82408120-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:26:05.348Z", + "version": 2 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "journald.kernel.subsystem", + "journald.kernel.device_node_path", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "journald.kernel.subsystem:usb OR journald.kernel.subsystem:hid" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] USB and HID messages", + "version": 1 + }, + "id": "f0232670-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:28:35.543Z", + "version": 1 + }, + { + "attributes": { + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "query": { + "language": "lucene", + "query": "" + } + } + }, + "optionsJSON": { + "darkTheme": false, + "hidePanelTitles": false, + "useMargins": true + }, + "panelsJSON": null, + "timeRestore": false, + "title": "[Journalbeat] Overview", + "version": 1 + }, + "id": "f2de4440-e2b9-11e8-9f52-734e93de180d", + "type": "dashboard", + "updated_at": "2018-11-07T18:30:18.083Z", + "version": 2 + } + ], + "version": "7.0.0-alpha1-SNAPSHOT" +} diff --git a/journalbeat/checkpoint/checkpoint.go b/journalbeat/checkpoint/checkpoint.go index f2c3bfacdab..0f29861040b 100644 --- a/journalbeat/checkpoint/checkpoint.go +++ b/journalbeat/checkpoint/checkpoint.go @@ -32,6 +32,7 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/paths" ) // Checkpoint persists event log state information to disk. @@ -87,6 +88,8 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp save: make(chan JournalState, 1), } + c.file = paths.Resolve(paths.Data, c.file) + // Minimum batch size. if c.maxUpdates < 1 { c.maxUpdates = 1 diff --git a/journalbeat/config/config.go b/journalbeat/config/config.go index a2c5b69d951..395bf13ec9c 100644 --- a/journalbeat/config/config.go +++ b/journalbeat/config/config.go @@ -21,25 +21,56 @@ package config import ( - "time" + "fmt" "github.com/elastic/beats/libbeat/common" ) +// SeekMode is specifies how a journal is read +type SeekMode uint8 + // Config stores the configuration of Journalbeat type Config struct { Inputs []*common.Config `config:"inputs"` RegistryFile string `config:"registry_file"` - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` - MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` - Seek string `config:"seek"` - Matches []string `config:"include_matches"` } -// DefaultConfig are the defaults of a Journalbeat instance -var DefaultConfig = Config{ - RegistryFile: "registry", - Backoff: 1 * time.Second, - MaxBackoff: 60 * time.Second, - Seek: "cursor", +const ( + // SeekInvalid is an invalid value for seek + SeekInvalid SeekMode = iota + // SeekHead option seeks to the head of a journal + SeekHead + // SeekTail option seeks to the tail of a journal + SeekTail + // SeekCursor option seeks to the position specified in the cursor + SeekCursor + + seekHeadStr = "head" + seekTailStr = "tail" + seekCursorStr = "cursor" +) + +var ( + // DefaultConfig are the defaults of a Journalbeat instance + DefaultConfig = Config{ + RegistryFile: "registry", + } + + seekModes = map[string]SeekMode{ + seekHeadStr: SeekHead, + seekTailStr: SeekTail, + seekCursorStr: SeekCursor, + } +) + +// Unpack validates and unpack "seek" config option +func (m *SeekMode) Unpack(value string) error { + mode, ok := seekModes[value] + if !ok { + return fmt.Errorf("invalid seek mode '%s'", value) + } + + *m = mode + + return nil } diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 6383998bd1b..5bdbfcd2ec9 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -18,9 +18,9 @@ package input import ( - "fmt" "time" + "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" ) @@ -29,15 +29,13 @@ import ( type Config struct { // Paths stores the paths to the journal files to be read. Paths []string `config:"paths"` - // MaxBackoff is the limit of the backoff time. - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` // Backoff is the current interval to wait before // attemting to read again from the journal. - BackoffFactor int `config:"backoff_factor" validate:"min=1"` - // BackoffFactor is the multiplier of Backoff. + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` + // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` // Seek is the method to read from journals. - Seek string `config:"seek"` + Seek config.SeekMode `config:"seek"` // Matches store the key value pairs to match entries. Matches []string `config:"include_matches"` @@ -50,25 +48,8 @@ type Config struct { var ( // DefaultConfig is the defaults for an inputs DefaultConfig = Config{ - Backoff: 1 * time.Second, - BackoffFactor: 2, - MaxBackoff: 60 * time.Second, - Seek: "cursor", + Backoff: 1 * time.Second, + MaxBackoff: 20 * time.Second, + Seek: config.SeekCursor, } ) - -// Validate check the configuration of the input. -func (c *Config) Validate() error { - correctSeek := false - for _, s := range []string{"cursor", "head", "tail"} { - if c.Seek == s { - correctSeek = true - } - } - - if !correctSeek { - return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek) - } - - return nil -} diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 42d8a0ea394..094d169a4ca 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -37,6 +37,7 @@ type Input struct { done chan struct{} config Config pipeline beat.Pipeline + client beat.Client states map[string]checkpoint.JournalState id uuid.UUID logger *logp.Logger @@ -120,7 +121,8 @@ func New( // Run connects to the output, collects entries from the readers // and then publishes the events. func (i *Input) Run() { - client, err := i.pipeline.ConnectWith(beat.ClientConfig{ + var err error + i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, EventMetadata: i.eventMeta, Meta: nil, @@ -133,13 +135,12 @@ func (i *Input) Run() { i.logger.Error("Error connecting to output: %v", err) return } - defer client.Close() - i.publishAll(client) + i.publishAll() } // publishAll reads events from all readers and publishes them. -func (i *Input) publishAll(client beat.Client) { +func (i *Input) publishAll() { out := make(chan *beat.Event) defer close(out) @@ -179,13 +180,14 @@ func (i *Input) publishAll(client beat.Client) { case <-i.done: return case e := <-out: - client.Publish(*e) + i.client.Publish(*e) } } } // Stop stops all readers of the input. func (i *Input) Stop() { + i.client.Close() for _, r := range i.readers { r.Close() } diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index edc5af4b179..46267fa6b09 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s + #max_backoff: 20s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -43,18 +43,6 @@ journalbeat.inputs: # data path. #registry_file: registry - # The number of seconds to wait before trying to read again from journals. - #backoff: 1s - # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s - - # Position to start reading from all journal. Possible values: head, tail, cursor - #seek: head - - # Exact matching for field values of events. - # Matching for nginx entries: "systemd.unit=nginx" - #matches: [] - #================================ General ====================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index 753c6ef4f8f..b2ab42fb81b 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s + #max_backoff: 20s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -43,18 +43,6 @@ journalbeat.inputs: # data path. #registry_file: registry - # The number of seconds to wait before trying to read again from journals. - #backoff: 1s - # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s - - # Position to start reading from all journal. Possible values: head, tail, cursor - #seek: head - - # Exact matching for field values of events. - # Matching for nginx entries: "systemd.unit=nginx" - #matches: [] - #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/reader/config.go b/journalbeat/reader/config.go index b81005ec926..7d52ff7422d 100644 --- a/journalbeat/reader/config.go +++ b/journalbeat/reader/config.go @@ -17,7 +17,11 @@ package reader -import "time" +import ( + "time" + + "github.com/elastic/beats/journalbeat/config" +) // Config stores the options of a reder. type Config struct { @@ -25,7 +29,7 @@ type Config struct { Path string // Seek specifies the seeking stategy. // Possible values: head, tail, cursor. - Seek string + Seek config.SeekMode // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration // Backoff is the current interval to wait before diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index f7afc30a4d0..8df68170fcd 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/journalbeat/checkpoint" "github.com/elastic/beats/journalbeat/cmd/instance" + "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -142,7 +143,8 @@ func setupMatches(j *sdjournal.Journal, matches []string) error { // seek seeks to the position determined by the coniguration and cursor state. func (r *Reader) seek(cursor string) { - if r.config.Seek == "cursor" { + switch r.config.Seek { + case config.SeekCursor: if cursor == "" { r.journal.SeekHead() r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning") @@ -154,12 +156,15 @@ func (r *Reader) seek(cursor string) { r.logger.Error("Error while seeking to cursor") } r.logger.Debug("Seeked to position defined in cursor") - } else if r.config.Seek == "tail" { + case config.SeekTail: r.journal.SeekTail() + r.journal.Next() r.logger.Debug("Tailing the journal file") - } else if r.config.Seek == "head" { + case config.SeekHead: r.journal.SeekHead() r.logger.Debug("Reading from the beginning of the journal file") + default: + r.logger.Error("Invalid seeking mode") } } @@ -220,7 +225,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { } if len(custom) != 0 { - fields["custom"] = custom + fields.Put("journald.custom", custom) } state := checkpoint.JournalState{ diff --git a/journalbeat/reader/journal_test.go b/journalbeat/reader/journal_test.go index 5170afd2593..8c37026f8ba 100644 --- a/journalbeat/reader/journal_test.go +++ b/journalbeat/reader/journal_test.go @@ -65,8 +65,10 @@ func TestToEvent(t *testing.T) { }, }, expectedFields: common.MapStr{ - "custom": common.MapStr{ - "my_custom_field": "value", + "journald": common.MapStr{ + "custom": common.MapStr{ + "my_custom_field": "value", + }, }, }, },