Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #9106 to 6.x: Add missing journalbeat non breaking fixes #9275

Merged
merged 5 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff]

*Journalbeat*

- Add missing journalbeat non breaking fixes. {pull}9106[9106]

*Metricbeat*

- Add missing namespace field in http server metricset {pull}7890[7890]
Expand Down
14 changes: 13 additions & 1 deletion journalbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 20s
#max_backoff: 60s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -42,3 +42,15 @@ journalbeat.inputs:
# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []
22 changes: 19 additions & 3 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import (
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/journalbeat/config"
conf "github.com/elastic/beats/journalbeat/config"
)

// Journalbeat instance
type Journalbeat struct {
inputs []*input.Input
done chan struct{}
config config.Config
config conf.Config

pipeline beat.Pipeline
checkpoint *checkpoint.Checkpoint
Expand All @@ -48,7 +48,23 @@ type Journalbeat struct {
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
cfgwarn.Experimental("Journalbeat is experimental.")

config := config.DefaultConfig
if cfg.HasField("seek") {
cfgwarn.Deprecate("7.0.0", "global seek is deprecated, Use seek on input level instead.")
}

if cfg.HasField("backoff") {
cfgwarn.Deprecate("7.0.0", "global backoff is deprecated, Use backoff on input level instead.")
}

if cfg.HasField("max_backoff") {
cfgwarn.Deprecate("7.0.0", "global max_backoff is deprecated, Use max_backoff on input level instead.")
}

if cfg.HasField("include_matches") {
cfgwarn.Deprecate("7.0.0", "global include_matches is deprecated, Use include_matches on input level instead.")
}

config := conf.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("error reading config file: %v", err)
}
Expand Down
3 changes: 0 additions & 3 deletions journalbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
)

// Checkpoint persists event log state information to disk.
Expand Down Expand Up @@ -88,8 +87,6 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

c.file = paths.Resolve(paths.Data, c.file)

// Minimum batch size.
if c.maxUpdates < 1 {
c.maxUpdates = 1
Expand Down
10 changes: 9 additions & 1 deletion journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package config

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
)
Expand All @@ -33,6 +34,10 @@ type SeekMode uint8
type Config struct {
Inputs []*common.Config `config:"inputs"`
RegistryFile string `config:"registry_file"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Seek SeekMode `config:"seek"`
Matches []string `config:"include_matches"`
}

const (
Expand All @@ -50,10 +55,13 @@ const (
seekCursorStr = "cursor"
)

// DefaultConfig are the defaults of a Journalbeat instance
var (
// DefaultConfig are the defaults of a Journalbeat instance
DefaultConfig = Config{
RegistryFile: "registry",
Backoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
Seek: SeekCursor,
}

seekModes = map[string]SeekMode{
Expand Down
13 changes: 8 additions & 5 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
type Config struct {
// Paths stores the paths to the journal files to be read.
Paths []string `config:"paths"`
// MaxBackoff is the limit of the backoff time.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// Backoff is the current interval to wait before
// attemting to read again from the journal.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// MaxBackoff is the limit of the backoff time.
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
// BackoffFactor is the multiplier of Backoff.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek config.SeekMode `config:"seek"`
Expand All @@ -48,8 +50,9 @@ type Config struct {
var (
// DefaultConfig is the defaults for an inputs
DefaultConfig = Config{
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 60 * time.Second,
Seek: config.SeekCursor,
}
)
14 changes: 13 additions & 1 deletion journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 20s
#max_backoff: 60s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -43,6 +43,18 @@ journalbeat.inputs:
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []

#================================ General ======================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
14 changes: 13 additions & 1 deletion journalbeat/journalbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 20s
#max_backoff: 60s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -43,6 +43,18 @@ journalbeat.inputs:
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
3 changes: 1 addition & 2 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func (r *Reader) seek(cursor string) {
r.logger.Debug("Seeked to position defined in cursor")
case config.SeekTail:
r.journal.SeekTail()
r.journal.Next()
r.logger.Debug("Tailing the journal file")
case config.SeekHead:
r.journal.SeekHead()
Expand Down Expand Up @@ -225,7 +224,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event {
}

if len(custom) != 0 {
fields.Put("journald.custom", custom)
fields["custom"] = custom
}

state := checkpoint.JournalState{
Expand Down
6 changes: 2 additions & 4 deletions journalbeat/reader/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ func TestToEvent(t *testing.T) {
},
},
expectedFields: common.MapStr{
"journald": common.MapStr{
"custom": common.MapStr{
"my_custom_field": "value",
},
"custom": common.MapStr{
"my_custom_field": "value",
},
},
},
Expand Down