Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce harvester_limit to limit number of harvesters #2417

Merged
merged 3 commits into from
Aug 30, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,16 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Packetbeat*

- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959]
- Match connections with IPv6 addresses to processes {pull}2254[2254]
- Add IP address to -devices command output {pull}2327[2327]
- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959]
- Match connections with IPv6 addresses to processes {pull}2254[2254]
- Add IP address to -devices command output {pull}2327[2327]

*Topbeat*

*Filebeat*

- Add harvester_limit option {pull}2417[2417]

*Winlogbeat*


Expand Down
19 changes: 18 additions & 1 deletion filebeat/docs/reference/configuration/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ The timestamp for closing a file does not depend on the modification time of the

You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 5m.


===== close_renamed

WARNING: Only use this options if you understand that data loss is a potential side effect.
Expand Down Expand Up @@ -410,6 +409,24 @@ the backoff algorithm is disabled, and the `backoff` value is used for waiting f
lines. The `backoff` value will be multiplied each time with the `backoff_factor` until
`max_backoff` is reached. The default is 2.

===== harvester_limit

EXPERIMENTAL

harvester_limit limits the number of harvesters that are started in parallel for one prospector. This directly relates
to the maximum number of file handlers that are opened. The default is 0 which means there is no limit. This configuration
is useful if the number of files to be harvested exceeds the open file handler limit of the operating system.

As setting a limit on harvester means that potentially not all files are opened in parallel, it is recommended to use
this option in combination with the close_* options to make sure harvesters are stopped more often so new files can be
picked up.

Currently if a new harvester can be started again, the new harvester to be started is picked randomly. This means it can
happen that a harvester for a file which was just closed and the file was updated again will be started instead of a
harvester for a file which wasn't harvested for a longer period of time.

This configuration option applies per prospector. This can be indirectly used to set higher priorities on certain prospectors
by assining a higher limit of harvesters.

[[configuration-global-options]]
=== Filebeat Global Configuration
Expand Down
4 changes: 4 additions & 0 deletions filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ filebeat.prospectors:
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
#backoff_factor: 2

# Experimental: Max number of harvesters that are started in parallel.
# Default is 0 which means unlimited
#harvester_limit: 0

### Harvester closing options

# Close inactive closes the file handler after the predefined period.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ filebeat.prospectors:
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
#backoff_factor: 2

# Experimental: Max number of harvesters that are started in parallel.
# Default is 0 which means unlimited
#harvester_limit: 0

### Harvester closing options

# Close inactive closes the file handler after the predefined period.
Expand Down
26 changes: 14 additions & 12 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@ import (

var (
defaultConfig = prospectorConfig{
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
CleanInactive: 0,
CleanRemoved: false,
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
CleanInactive: 0,
CleanRemoved: false,
HarvesterLimit: 0,
}
)

type prospectorConfig struct {
ExcludeFiles []*regexp.Regexp `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
ExcludeFiles []*regexp.Regexp `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
30 changes: 21 additions & 9 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"time"

"sync/atomic"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
Expand All @@ -14,14 +16,15 @@ import (
)

type Prospector struct {
cfg *common.Config // Raw config
config prospectorConfig
prospectorer Prospectorer
spoolerChan chan *input.Event
harvesterChan chan *input.Event
done chan struct{}
states *file.States
wg sync.WaitGroup
cfg *common.Config // Raw config
config prospectorConfig
prospectorer Prospectorer
spoolerChan chan *input.Event
harvesterChan chan *input.Event
done chan struct{}
states *file.States
wg sync.WaitGroup
harvesterCounter uint64
}

type Prospectorer interface {
Expand Down Expand Up @@ -155,6 +158,11 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er
}

func (p *Prospector) startHarvester(state file.State, offset int64) error {

if p.config.HarvesterLimit > 0 && atomic.LoadUint64(&p.harvesterCounter) >= p.config.HarvesterLimit {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a comment about atomic ops being safe, due to startHarvester not being executed concurrently and atomic ops only required for harvesters shutting down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added in separate commit. Can be squashed.

return fmt.Errorf("Harvester limit reached.")
}

state.Offset = offset
// Create harvester with state
h, err := p.createHarvester(state)
Expand All @@ -163,8 +171,12 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
}

p.wg.Add(1)
atomic.AddUint64(&p.harvesterCounter, 1)
go func() {
defer p.wg.Done()
defer func() {
atomic.AddUint64(&p.harvesterCounter, ^uint64(0))
p.wg.Done()
}()
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)
h.Harvest()
}()
Expand Down
6 changes: 3 additions & 3 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (p *ProspectorLog) scan() {
logp.Debug("prospector", "Start harvester for new file: %s", newState.Source)
err := p.Prospector.startHarvester(newState, 0)
if err != nil {
logp.Err("Harvester could not be started on new file: %s", err)
logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err)
}
} else {
p.harvestExistingFile(newState, lastState)
Expand All @@ -182,7 +182,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
logp.Debug("prospector", "Resuming harvesting of file: %s, offset: %v", newState.Source, oldState.Offset)
err := p.Prospector.startHarvester(newState, oldState.Offset)
if err != nil {
logp.Err("Harvester could not be started on existing file: %s", err)
logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
}
return
}
Expand All @@ -192,7 +192,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
logp.Debug("prospector", "Old file was truncated. Starting from the beginning: %s", newState.Source)
err := p.Prospector.startHarvester(newState, 0)
if err != nil {
logp.Err("Harvester could not be started on truncated file: %s", err)
logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
}

filesTrucated.Add(1)
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ filebeat.prospectors:
force_close_files: {{force_close_files}}
clean_inactive: {{clean_inactive}}
clean_removed: {{clean_removed}}
harvester_limit: {{harvester_limit | default(0) }}

{% if fields %}
fields:
Expand Down
49 changes: 48 additions & 1 deletion filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,57 @@ def test_skip_symlinks(self):
lambda: self.output_has(lines=1),
max_timeout=15)

time.sleep(5)
filebeat.check_kill_and_wait()

data = self.read_output()

# Make sure there is only one entry, means it didn't follow the symlink
assert len(data) == 1

def test_harvester_limit(self):
"""
Test if harvester_limit applies
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
harvester_limit=1,
close_inactive="1s",
scan_frequency="1s",
)

os.mkdir(self.working_dir + "/log/")
testfile1 = self.working_dir + "/log/test1.log"
testfile2 = self.working_dir + "/log/test2.log"
testfile3 = self.working_dir + "/log/test3.log"

with open(testfile1, 'w') as file:
file.write("Line1\n")

with open(testfile2, 'w') as file:
file.write("Line2\n")

with open(testfile3, 'w') as file:
file.write("Line3\n")

filebeat = self.start_beat()

# check that not all harvesters were started
self.wait_until(
lambda: self.log_contains("Harvester limit reached"),
max_timeout=10)

# wait for registry to be written
self.wait_until(
lambda: self.log_contains("Registry file updated"),
max_timeout=10)

# Make sure not all events were written so far
data = self.read_output()
assert len(data) < 3

self.wait_until(lambda: self.output_has(lines=3), max_timeout=15)

data = self.read_output()
assert len(data) == 3

filebeat.check_kill_and_wait()