Skip to content

Commit

Permalink
Cherry-pick elastic#7620 to 6.x: Less granular reader packages (elast…
Browse files Browse the repository at this point in the history
…ic#8166)

Cherry-pick of PR elastic#7620 to 6.x branch. Original message:

I created new packages:

    readjson: includes JSON and DockerJSON
    readline: includes LimitReader, StripNewLineReader, etc.

I left multiline as is for now.
  • Loading branch information
kvch committed Aug 31, 2018
1 parent aa39658 commit 2993d73
Show file tree
Hide file tree
Showing 16 changed files with 50 additions and 56 deletions.
12 changes: 5 additions & 7 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ import (
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/docker_json"
"github.com/elastic/beats/filebeat/reader/encode"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/json"
"github.com/elastic/beats/filebeat/reader/limit"
"github.com/elastic/beats/filebeat/reader/multiline"
"github.com/elastic/beats/filebeat/reader/strip_newline"
"github.com/elastic/beats/filebeat/reader/readfile"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
"github.com/elastic/beats/filebeat/util"
)

Expand Down Expand Up @@ -552,7 +550,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

r, err = encode.New(h.log, h.encoding, h.config.BufferSize)
r, err = readfile.NewEncodeReader(h.log, h.encoding, h.config.BufferSize)
if err != nil {
return nil, err
}
Expand All @@ -566,7 +564,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
r = json.New(r, h.config.JSON)
}

r = strip_newline.New(r)
r = readfile.NewStripNewline(r)

if h.config.Multiline != nil {
r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline)
Expand All @@ -575,5 +573,5 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
}
}

return limit.New(r, h.config.MaxBytes), nil
return readfile.NewLimitReader(r, h.config.MaxBytes), nil
}
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
"github.com/elastic/beats/libbeat/common"
)

Expand Down
4 changes: 2 additions & 2 deletions filebeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/timeout"
"github.com/elastic/beats/filebeat/reader/readfile"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/logp"
)
Expand Down Expand Up @@ -112,7 +112,7 @@ func New(
}

if tout > 0 {
r = timeout.New(r, sigMultilineTimeout, tout)
r = readfile.NewTimeoutReader(r, sigMultilineTimeout, tout)
}

mlr := &Reader{
Expand Down
9 changes: 4 additions & 5 deletions filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/encode"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/strip_newline"
"github.com/elastic/beats/filebeat/reader/readfile"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
"github.com/elastic/beats/libbeat/common/match"
)

Expand Down Expand Up @@ -190,12 +189,12 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = encode.New(in, enc, 4096)
r, err = readfile.NewEncodeReader(in, enc, 4096)
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}

r, err = New(strip_newline.New(r), "\n", 1<<20, &cfg)
r, err = New(readfile.NewStripNewline(r), "\n", 1<<20, &cfg)
if err != nil {
t.Fatalf("failed to initialize reader: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,36 @@
// specific language governing permissions and limitations
// under the License.

package encode
package readfile

import (
"io"
"time"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/line"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
)

// Reader produces lines by reading lines from an io.Reader
// through a decoder converting the reader it's encoding to utf-8.
type Reader struct {
reader *line.Reader
type EncoderReader struct {
reader *LineReader
}

// New creates a new Encode reader from input reader by applying
// the given codec.
func New(
func NewEncodeReader(
r io.Reader,
codec encoding.Encoding,
bufferSize int,
) (Reader, error) {
eReader, err := line.New(r, codec, bufferSize)
return Reader{eReader}, err
) (EncoderReader, error) {
eReader, err := NewLineReader(r, codec, bufferSize)
return EncoderReader{eReader}, err
}

// Next reads the next line from it's initial io.Reader
// This converts a io.Reader to a reader.reader
func (r Reader) Next() (reader.Message, error) {
func (r EncoderReader) Next() (reader.Message, error) {
c, sz, err := r.reader.Next()
// Creating message object
return reader.Message{
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@
// specific language governing permissions and limitations
// under the License.

package limit
package readfile

import (
"github.com/elastic/beats/filebeat/reader"
)

// Reader sets an upper limited on line length. Lines longer
// then the max configured line length will be snapped short.
type Reader struct {
type LimitReader struct {
reader reader.Reader
maxBytes int
}

// New creates a new reader limiting the line length.
func New(r reader.Reader, maxBytes int) *Reader {
return &Reader{reader: r, maxBytes: maxBytes}
func NewLimitReader(r reader.Reader, maxBytes int) *LimitReader {
return &LimitReader{reader: r, maxBytes: maxBytes}
}

// Next returns the next line.
func (r *Reader) Next() (reader.Message, error) {
func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package line
package readfile

import (
"io"
Expand All @@ -30,7 +30,7 @@ import (
// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type Reader struct {
type LineReader struct {
reader io.Reader
codec encoding.Encoding
bufferSize int
Expand All @@ -43,7 +43,7 @@ type Reader struct {
}

// New creates a new reader object
func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, error) {
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) {
encoder := codec.NewEncoder()

// Create newline char based on encoding
Expand All @@ -52,7 +52,7 @@ func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, err
return nil, err
}

return &Reader{
return &LineReader{
reader: input,
codec: codec,
bufferSize: bufferSize,
Expand All @@ -64,7 +64,7 @@ func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, err
}

// Next reads the next line until the new line character
func (r *Reader) Next() ([]byte, int, error) {
func (r *LineReader) Next() ([]byte, int, error) {
// This loop is need in case advance detects an line ending which turns out
// not to be one when decoded. If that is the case, reading continues.
for {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (r *Reader) Next() ([]byte, int, error) {

// Reads from the buffer until a new line character is detected
// Returns an error otherwise
func (r *Reader) advance() error {
func (r *LineReader) advance() error {
// Initial check if buffer has already a newLine character
idx := r.inBuffer.IndexFrom(r.inOffset, r.nl)

Expand Down Expand Up @@ -163,7 +163,7 @@ func (r *Reader) advance() error {
return err
}

func (r *Reader) decode(end int) (int, error) {
func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
inBytes := r.inBuffer.Bytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// +build !integration

package line
package readfile

import (
"bytes"
Expand All @@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/text/transform"

"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
)

// Sample texts are from http://www.columbia.edu/~kermit/utf8.html
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := New(buffer, codec, 1024)
reader, err := NewLineReader(buffer, codec, 1024)
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -159,7 +159,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := New(buffer, codec, buffer.Len())
reader, err := NewLineReader(buffer, codec, buffer.Len())
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package strip_newline
package readfile

import (
"github.com/elastic/beats/filebeat/reader"
Expand All @@ -28,7 +28,7 @@ type StripNewline struct {
}

// New creates a new line reader stripping the last tailing newline.
func New(r reader.Reader) *StripNewline {
func NewStripNewline(r reader.Reader) *StripNewline {
return &StripNewline{r}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// +build !integration

package strip_newline
package readfile

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package timeout
package readfile

import (
"errors"
Expand All @@ -30,7 +30,7 @@ var (

// timeoutProcessor will signal some configurable timeout error if no
// new line can be returned in time.
type Reader struct {
type TimeoutReader struct {
reader reader.Reader
timeout time.Duration
signal error
Expand All @@ -44,12 +44,12 @@ type lineMessage struct {
}

// New returns a new timeout reader from an input line reader.
func New(reader reader.Reader, signal error, t time.Duration) *Reader {
func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *TimeoutReader {
if signal == nil {
signal = errTimeout
}

return &Reader{
return &TimeoutReader{
reader: reader,
signal: signal,
timeout: t,
Expand All @@ -62,7 +62,7 @@ func New(reader reader.Reader, signal error, t time.Duration) *Reader {
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *Reader) Next() (reader.Message, error) {
func (r *TimeoutReader) Next() (reader.Message, error) {
if !r.running {
r.running = true
go func() {
Expand Down
12 changes: 5 additions & 7 deletions filebeat/scripts/tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ import (
"time"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/encode"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/limit"
"github.com/elastic/beats/filebeat/reader/multiline"
"github.com/elastic/beats/filebeat/reader/strip_newline"
"github.com/elastic/beats/filebeat/reader/readfile"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
)
Expand Down Expand Up @@ -135,12 +133,12 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) {
}

var r reader.Reader
r, err = encode.New(f, enc, 4096)
r, err = readfile.NewEncodeReader(f, enc, 4096)
if err != nil {
return nil, err
}

r = strip_newline.New(r)
r = readfile.NewStripNewline(r)

if conf.multiPattern != "" {
p, err := match.Compile(conf.multiPattern)
Expand All @@ -158,7 +156,7 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) {
return nil, err
}
}
r = limit.New(r, conf.maxBytes)
r = readfile.NewLimitReader(r, conf.maxBytes)

var logs []string
for {
Expand Down

0 comments on commit 2993d73

Please sign in to comment.