Skip to content

Commit

Permalink
Configurable line terminator (#11015)
Browse files Browse the repository at this point in the history
This PR add a new input configuration option named `line_terminator`:

```
# Characters which separate the lines. Valid values: auto, line_feed, vertical_tab, form_feed,
# carriage_return, carriage_return_line_feed, next_line, line_separator, paragraph_separator.
#line_terminator: auto
```
The option `auto` tells Filebeat to use our current hybrid new line finder approach. Thus, we can avoid introducing a breaking change.

It also contains a minor refactoring in `readfile` package. I have created a new type `Config` which stores the configuration of the readers of the package. This eliminates a long list of parameters in the constructors of `EncodeReader` and `LineReader`.


Closes #5500
  • Loading branch information
kvch committed Apr 15, 2019
1 parent 2499449 commit ebc7da7
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- New Filebeat coredns module to ingest coredns logs. It supports both native coredns deployment and coredns deployment in kubernetes. {pull}11200[11200]
- New module for Cisco ASA logs. {issue}9200[9200] {pull}11171[11171]
- Added support for Cisco ASA fields to the netflow input. {pull}11201[11201]
- Configurable line terminator. {pull}11015[11015]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/common.reference.inputs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ filebeat.inputs:
# This is especially useful for multiline log messages which can get large.
#max_bytes: 10485760

# Characters which separate the lines. Valid values: auto, line_feed, vertical_tab, form_feed,
# carriage_return, carriage_return_line_feed, next_line, line_separator, paragraph_separator.
#line_terminator: auto

### Recursive glob configuration

# Expand "**" patterns into regular glob patterns.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ filebeat.inputs:
# This is especially useful for multiline log messages which can get large.
#max_bytes: 10485760

# Characters which separate the lines. Valid values: auto, line_feed, vertical_tab, form_feed,
# carriage_return, carriage_return_line_feed, next_line, line_separator, paragraph_separator.
#line_terminator: auto

### Recursive glob configuration

# Expand "**" patterns into regular glob patterns.
Expand Down
17 changes: 10 additions & 7 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/reader/multiline"
"github.com/elastic/beats/libbeat/reader/readfile"
"github.com/elastic/beats/libbeat/reader/readjson"
)

Expand All @@ -55,8 +56,9 @@ var (
RecursiveGlob: true,

// Harvester
BufferSize: 16 * humanize.KiByte,
MaxBytes: 10 * humanize.MiByte,
BufferSize: 16 * humanize.KiByte,
MaxBytes: 10 * humanize.MiByte,
LineTerminator: readfile.AutoLineTerminator,
LogConfig: LogConfig{
Backoff: 1 * time.Second,
BackoffFactor: 2,
Expand Down Expand Up @@ -96,11 +98,12 @@ type config struct {
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`

ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
JSON *readjson.Config `config:"json"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
JSON *readjson.Config `config:"json"`

// Hidden on purpose, used by the docker input:
DockerJSON *struct {
Expand Down
10 changes: 8 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ func (h *Harvester) Setup() error {
return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
}

logp.Debug("harvester", "Harvester setup successful. Line terminator: %d", h.config.LineTerminator)

return nil
}

Expand Down Expand Up @@ -564,7 +566,11 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize)
r, err = readfile.NewEncodeReader(reader, readfile.Config{
Codec: h.encoding,
BufferSize: h.config.BufferSize,
Terminator: h.config.LineTerminator,
})
if err != nil {
return nil, err
}
Expand All @@ -578,7 +584,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
r = readjson.NewJSONReader(r, h.config.JSON)
}

r = readfile.NewStripNewline(r)
r = readfile.NewStripNewline(r, h.config.LineTerminator)

if h.config.Multiline != nil {
r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline)
Expand Down
6 changes: 4 additions & 2 deletions filebeat/input/log/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/reader"
"github.com/elastic/beats/libbeat/reader/readfile"
"github.com/elastic/beats/libbeat/reader/readfile/encoding"
)

Expand Down Expand Up @@ -82,8 +83,9 @@ func TestReadLine(t *testing.T) {
MaxBackoff: 1 * time.Second,
BackoffFactor: 2,
},
BufferSize: 100,
MaxBytes: 1000,
BufferSize: 100,
MaxBytes: 1000,
LineTerminator: readfile.LineFeed,
},
source: source,
}
Expand Down
8 changes: 6 additions & 2 deletions filebeat/scripts/tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,16 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) {
}

var r reader.Reader
r, err = readfile.NewEncodeReader(f, enc, 4096)
r, err = readfile.NewEncodeReader(f, readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
})
if err != nil {
return nil, err
}

r = readfile.NewStripNewline(r)
r = readfile.NewStripNewline(r, readfile.LineFeed)

if conf.multiPattern != "" {
p, err := match.Compile(conf.multiPattern)
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ filebeat.{{input_config | default("inputs")}}:
harvester_buffer_size: {{harvester_buffer_size}}
encoding: {{encoding | default("utf-8") }}
tail_files: {{tail_files}}
line_terminator: {{ line_terminator }}
backoff: 0.1s
backoff_factor: 1
max_backoff: 0.1s
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/input/test-newline.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello world goodbye world
8 changes: 6 additions & 2 deletions libbeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,16 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = readfile.NewEncodeReader(in, enc, 4096)
r, err = readfile.NewEncodeReader(in, readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
})
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}

r, err = New(readfile.NewStripNewline(r), "\n", 1<<20, &cfg)
r, err = New(readfile.NewStripNewline(r, readfile.LineFeed), "\n", 1<<20, &cfg)
if err != nil {
t.Fatalf("failed to initialize reader: %v", err)
}
Expand Down
16 changes: 10 additions & 6 deletions libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ type EncoderReader struct {
reader *LineReader
}

// Config stores the configuration for the readers required to read
// a file line by line
type Config struct {
Codec encoding.Encoding
BufferSize int
Terminator LineTerminator
}

// New creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(
r io.Reader,
codec encoding.Encoding,
bufferSize int,
) (EncoderReader, error) {
eReader, err := NewLineReader(r, codec, bufferSize)
func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
return EncoderReader{eReader}, err
}

Expand Down
35 changes: 21 additions & 14 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package readfile

import (
"bytes"
"fmt"
"io"

"golang.org/x/text/encoding"
"golang.org/x/text/transform"

"github.com/elastic/beats/libbeat/common/streambuf"
Expand All @@ -32,9 +33,9 @@ import (
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
codec encoding.Encoding
bufferSize int
nl []byte
decodedNl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
Expand All @@ -43,21 +44,26 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) {
encoder := codec.NewEncoder()
func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

// Create newline char based on encoding
nl, _, err := transform.Bytes(encoder, []byte{'\n'})
terminator, ok := lineTerminatorCharacters[config.Terminator]
if !ok {
return nil, fmt.Errorf("unknown line terminator: %+v", config.Terminator)
}

nl, _, err := transform.Bytes(encoder, terminator)
if err != nil {
return nil, err
}

return &LineReader{
reader: input,
codec: codec,
bufferSize: bufferSize,
bufferSize: config.BufferSize,
decoder: config.Codec.NewDecoder(),
nl: nl,
decoder: codec.NewDecoder(),
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
}, nil
Expand All @@ -74,7 +80,7 @@ func (r *LineReader) Next() ([]byte, int, error) {
return nil, 0, err
}

// Check last decoded byte really being '\n' also unencoded
// Check last decoded byte really being newline also unencoded
// if not, continue reading
buf := r.outBuffer.Bytes()

Expand All @@ -84,14 +90,15 @@ func (r *LineReader) Next() ([]byte, int, error) {
continue
}

if buf[len(buf)-1] == '\n' {
if bytes.HasSuffix(buf, r.decodedNl) {
break
} else {
logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1])
logp.Debug("line", "In %s", string(buf))
}
}

// output buffer contains complete line ending with '\n'. Extract
// output buffer contains complete line ending with newline. Extract
// byte slice from buffer and reset output buffer.
bytes, err := r.outBuffer.Collect(r.outBuffer.Len())
r.outBuffer.Reset()
Expand All @@ -112,7 +119,7 @@ func (r *LineReader) advance() error {
// Initial check if buffer has already a newLine character
idx := r.inBuffer.IndexFrom(r.inOffset, r.nl)

// fill inBuffer until '\n' sequence has been found in input buffer
// fill inBuffer until newline sequence has been found in input buffer
for idx == -1 {
// increase search offset to reduce iterations on buffer when looping
newOffset := r.inBuffer.Len() - len(r.nl)
Expand Down Expand Up @@ -140,7 +147,7 @@ func (r *LineReader) advance() error {
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}

// found encoded byte sequence for '\n' in buffer
// found encoded byte sequence for newline in buffer
// -> decode input sequence into outBuffer
sz, err := r.decode(idx + len(r.nl))
if err != nil {
Expand All @@ -156,7 +163,7 @@ func (r *LineReader) advance() error {
// continue scanning input buffer from last position + 1
r.inOffset = idx + 1 - sz
if r.inOffset < 0 {
// fix inOffset if '\n' has encoding > 8bits + firl line has been decoded
// fix inOffset if newline has encoding > 8bits + firl line has been decoded
r.inOffset = 0
}

Expand Down
85 changes: 85 additions & 0 deletions libbeat/reader/readfile/line_terminator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package readfile

import "fmt"

// LineTerminator is the option storing the line terminator characters
// Supported newline reference: https://en.wikipedia.org/wiki/Newline#Unicode
type LineTerminator uint8

const (
// InvalidTerminator is the invalid terminator
InvalidTerminator LineTerminator = iota
// AutoLineTerminator accepts both LF and CR+LF
AutoLineTerminator
// LineFeed is the unicode char LF
LineFeed
// VerticalTab is the unicode char VT
VerticalTab
// FormFeed is the unicode char FF
FormFeed
// CarriageReturn is the unicode char CR
CarriageReturn
// CarriageReturnLineFeed is the unicode chars CR+LF
CarriageReturnLineFeed
// NextLine is the unicode char NEL
NextLine
// LineSeparator is the unicode char LS
LineSeparator
// ParagraphSeparator is the unicode char PS
ParagraphSeparator
)

var (
lineTerminators = map[string]LineTerminator{
"auto": AutoLineTerminator,
"line_feed": LineFeed,
"vertical_tab": VerticalTab,
"form_feed": FormFeed,
"carriage_return": CarriageReturn,
"carriage_return_line_feed": CarriageReturnLineFeed,
"next_line": NextLine,
"line_separator": LineSeparator,
"paragraph_separator": ParagraphSeparator,
}

lineTerminatorCharacters = map[LineTerminator][]byte{
AutoLineTerminator: []byte{'\u000A'},
LineFeed: []byte{'\u000A'},
VerticalTab: []byte{'\u000B'},
FormFeed: []byte{'\u000C'},
CarriageReturn: []byte{'\u000D'},
CarriageReturnLineFeed: []byte("\u000D\u000A"),
NextLine: []byte{'\u0085'},
LineSeparator: []byte("\u2028"),
ParagraphSeparator: []byte("\u2029"),
}
)

// Unpack unpacks the configuration from the config file
func (l *LineTerminator) Unpack(option string) error {
terminator, ok := lineTerminators[option]
if !ok {
return fmt.Errorf("invalid line terminator: %s", option)
}

*l = terminator

return nil
}
Loading

0 comments on commit ebc7da7

Please sign in to comment.