From 2993d731c1a9d163327e49926285a434cb67a14e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 31 Aug 2018 09:50:15 +0200 Subject: [PATCH] Cherry-pick #7620 to 6.x: Less granular reader packages (#8166) Cherry-pick of PR #7620 to 6.x branch. Original message: I created new packages: readjson: includes JSON and DockerJSON readline: includes LimitReader, StripNewLineReader, etc. I left multiline as is for now. --- filebeat/input/log/harvester.go | 12 +++++------- filebeat/input/log/harvester_test.go | 2 +- filebeat/reader/multiline/multiline.go | 4 ++-- filebeat/reader/multiline/multiline_test.go | 9 ++++----- .../reader/{encode => readfile}/encode.go | 19 +++++++++---------- .../{encode => readfile}/encoding/encoding.go | 0 .../{encode => readfile}/encoding/mixed.go | 0 .../{encode => readfile}/encoding/utf16.go | 0 .../encoding/utf16_test.go | 0 filebeat/reader/{limit => readfile}/limit.go | 10 +++++----- filebeat/reader/{line => readfile}/line.go | 14 +++++++------- .../reader/{line => readfile}/line_test.go | 8 ++++---- .../strip_newline.go | 4 ++-- .../strip_newline_test.go | 2 +- .../reader/{timeout => readfile}/timeout.go | 10 +++++----- filebeat/scripts/tester/main.go | 12 +++++------- 16 files changed, 50 insertions(+), 56 deletions(-) rename filebeat/reader/{encode => readfile}/encode.go (81%) rename filebeat/reader/{encode => readfile}/encoding/encoding.go (100%) rename filebeat/reader/{encode => readfile}/encoding/mixed.go (100%) rename filebeat/reader/{encode => readfile}/encoding/utf16.go (100%) rename filebeat/reader/{encode => readfile}/encoding/utf16_test.go (100%) rename filebeat/reader/{limit => readfile}/limit.go (85%) rename filebeat/reader/{line => readfile}/line.go (94%) rename filebeat/reader/{line => readfile}/line_test.go (96%) rename filebeat/reader/{strip_newline => readfile}/strip_newline.go (96%) rename filebeat/reader/{strip_newline => readfile}/strip_newline_test.go (98%) rename filebeat/reader/{timeout => readfile}/timeout.go (90%) diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index d1d4f148185..9e7422eec62 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -51,12 +51,10 @@ import ( "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/reader" "github.com/elastic/beats/filebeat/reader/docker_json" - "github.com/elastic/beats/filebeat/reader/encode" - "github.com/elastic/beats/filebeat/reader/encode/encoding" "github.com/elastic/beats/filebeat/reader/json" - "github.com/elastic/beats/filebeat/reader/limit" "github.com/elastic/beats/filebeat/reader/multiline" - "github.com/elastic/beats/filebeat/reader/strip_newline" + "github.com/elastic/beats/filebeat/reader/readfile" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" "github.com/elastic/beats/filebeat/util" ) @@ -552,7 +550,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { return nil, err } - r, err = encode.New(h.log, h.encoding, h.config.BufferSize) + r, err = readfile.NewEncodeReader(h.log, h.encoding, h.config.BufferSize) if err != nil { return nil, err } @@ -566,7 +564,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { r = json.New(r, h.config.JSON) } - r = strip_newline.New(r) + r = readfile.NewStripNewline(r) if h.config.Multiline != nil { r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline) @@ -575,5 +573,5 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { } } - return limit.New(r, h.config.MaxBytes), nil + return readfile.NewLimitReader(r, h.config.MaxBytes), nil } diff --git a/filebeat/input/log/harvester_test.go b/filebeat/input/log/harvester_test.go index a311743cc93..f3f6697a8de 100644 --- a/filebeat/input/log/harvester_test.go +++ b/filebeat/input/log/harvester_test.go @@ -31,7 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/encode/encoding" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" "github.com/elastic/beats/libbeat/common" ) diff --git a/filebeat/reader/multiline/multiline.go b/filebeat/reader/multiline/multiline.go index f1b871f6c74..57209be94cd 100644 --- a/filebeat/reader/multiline/multiline.go +++ b/filebeat/reader/multiline/multiline.go @@ -23,7 +23,7 @@ import ( "time" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/timeout" + "github.com/elastic/beats/filebeat/reader/readfile" "github.com/elastic/beats/libbeat/common/match" "github.com/elastic/beats/libbeat/logp" ) @@ -112,7 +112,7 @@ func New( } if tout > 0 { - r = timeout.New(r, sigMultilineTimeout, tout) + r = readfile.NewTimeoutReader(r, sigMultilineTimeout, tout) } mlr := &Reader{ diff --git a/filebeat/reader/multiline/multiline_test.go b/filebeat/reader/multiline/multiline_test.go index 50ad18f17d1..6fe05fde0d2 100644 --- a/filebeat/reader/multiline/multiline_test.go +++ b/filebeat/reader/multiline/multiline_test.go @@ -30,9 +30,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/encode" - "github.com/elastic/beats/filebeat/reader/encode/encoding" - "github.com/elastic/beats/filebeat/reader/strip_newline" + "github.com/elastic/beats/filebeat/reader/readfile" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" "github.com/elastic/beats/libbeat/common/match" ) @@ -190,12 +189,12 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade } var r reader.Reader - r, err = encode.New(in, enc, 4096) + r, err = readfile.NewEncodeReader(in, enc, 4096) if err != nil { t.Fatalf("Failed to initialize line reader: %v", err) } - r, err = New(strip_newline.New(r), "\n", 1<<20, &cfg) + r, err = New(readfile.NewStripNewline(r), "\n", 1<<20, &cfg) if err != nil { t.Fatalf("failed to initialize reader: %v", err) } diff --git a/filebeat/reader/encode/encode.go b/filebeat/reader/readfile/encode.go similarity index 81% rename from filebeat/reader/encode/encode.go rename to filebeat/reader/readfile/encode.go index d0306689200..a86e6541bec 100644 --- a/filebeat/reader/encode/encode.go +++ b/filebeat/reader/readfile/encode.go @@ -15,37 +15,36 @@ // specific language governing permissions and limitations // under the License. -package encode +package readfile import ( "io" "time" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/encode/encoding" - "github.com/elastic/beats/filebeat/reader/line" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" ) // Reader produces lines by reading lines from an io.Reader // through a decoder converting the reader it's encoding to utf-8. -type Reader struct { - reader *line.Reader +type EncoderReader struct { + reader *LineReader } // New creates a new Encode reader from input reader by applying // the given codec. -func New( +func NewEncodeReader( r io.Reader, codec encoding.Encoding, bufferSize int, -) (Reader, error) { - eReader, err := line.New(r, codec, bufferSize) - return Reader{eReader}, err +) (EncoderReader, error) { + eReader, err := NewLineReader(r, codec, bufferSize) + return EncoderReader{eReader}, err } // Next reads the next line from it's initial io.Reader // This converts a io.Reader to a reader.reader -func (r Reader) Next() (reader.Message, error) { +func (r EncoderReader) Next() (reader.Message, error) { c, sz, err := r.reader.Next() // Creating message object return reader.Message{ diff --git a/filebeat/reader/encode/encoding/encoding.go b/filebeat/reader/readfile/encoding/encoding.go similarity index 100% rename from filebeat/reader/encode/encoding/encoding.go rename to filebeat/reader/readfile/encoding/encoding.go diff --git a/filebeat/reader/encode/encoding/mixed.go b/filebeat/reader/readfile/encoding/mixed.go similarity index 100% rename from filebeat/reader/encode/encoding/mixed.go rename to filebeat/reader/readfile/encoding/mixed.go diff --git a/filebeat/reader/encode/encoding/utf16.go b/filebeat/reader/readfile/encoding/utf16.go similarity index 100% rename from filebeat/reader/encode/encoding/utf16.go rename to filebeat/reader/readfile/encoding/utf16.go diff --git a/filebeat/reader/encode/encoding/utf16_test.go b/filebeat/reader/readfile/encoding/utf16_test.go similarity index 100% rename from filebeat/reader/encode/encoding/utf16_test.go rename to filebeat/reader/readfile/encoding/utf16_test.go diff --git a/filebeat/reader/limit/limit.go b/filebeat/reader/readfile/limit.go similarity index 85% rename from filebeat/reader/limit/limit.go rename to filebeat/reader/readfile/limit.go index 356ba4395fb..1d7b46e2a47 100644 --- a/filebeat/reader/limit/limit.go +++ b/filebeat/reader/readfile/limit.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package limit +package readfile import ( "github.com/elastic/beats/filebeat/reader" @@ -23,18 +23,18 @@ import ( // Reader sets an upper limited on line length. Lines longer // then the max configured line length will be snapped short. -type Reader struct { +type LimitReader struct { reader reader.Reader maxBytes int } // New creates a new reader limiting the line length. -func New(r reader.Reader, maxBytes int) *Reader { - return &Reader{reader: r, maxBytes: maxBytes} +func NewLimitReader(r reader.Reader, maxBytes int) *LimitReader { + return &LimitReader{reader: r, maxBytes: maxBytes} } // Next returns the next line. -func (r *Reader) Next() (reader.Message, error) { +func (r *LimitReader) Next() (reader.Message, error) { message, err := r.reader.Next() if len(message.Content) > r.maxBytes { message.Content = message.Content[:r.maxBytes] diff --git a/filebeat/reader/line/line.go b/filebeat/reader/readfile/line.go similarity index 94% rename from filebeat/reader/line/line.go rename to filebeat/reader/readfile/line.go index e3b3c176421..be2714b498f 100644 --- a/filebeat/reader/line/line.go +++ b/filebeat/reader/readfile/line.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package line +package readfile import ( "io" @@ -30,7 +30,7 @@ import ( // lineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. -type Reader struct { +type LineReader struct { reader io.Reader codec encoding.Encoding bufferSize int @@ -43,7 +43,7 @@ type Reader struct { } // New creates a new reader object -func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, error) { +func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) { encoder := codec.NewEncoder() // Create newline char based on encoding @@ -52,7 +52,7 @@ func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, err return nil, err } - return &Reader{ + return &LineReader{ reader: input, codec: codec, bufferSize: bufferSize, @@ -64,7 +64,7 @@ func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, err } // Next reads the next line until the new line character -func (r *Reader) Next() ([]byte, int, error) { +func (r *LineReader) Next() ([]byte, int, error) { // This loop is need in case advance detects an line ending which turns out // not to be one when decoded. If that is the case, reading continues. for { @@ -108,7 +108,7 @@ func (r *Reader) Next() ([]byte, int, error) { // Reads from the buffer until a new line character is detected // Returns an error otherwise -func (r *Reader) advance() error { +func (r *LineReader) advance() error { // Initial check if buffer has already a newLine character idx := r.inBuffer.IndexFrom(r.inOffset, r.nl) @@ -163,7 +163,7 @@ func (r *Reader) advance() error { return err } -func (r *Reader) decode(end int) (int, error) { +func (r *LineReader) decode(end int) (int, error) { var err error buffer := make([]byte, 1024) inBytes := r.inBuffer.Bytes() diff --git a/filebeat/reader/line/line_test.go b/filebeat/reader/readfile/line_test.go similarity index 96% rename from filebeat/reader/line/line_test.go rename to filebeat/reader/readfile/line_test.go index 6b420b7645a..9d244f0decf 100644 --- a/filebeat/reader/line/line_test.go +++ b/filebeat/reader/readfile/line_test.go @@ -17,7 +17,7 @@ // +build !integration -package line +package readfile import ( "bytes" @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/assert" "golang.org/x/text/transform" - "github.com/elastic/beats/filebeat/reader/encode/encoding" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" ) // Sample texts are from http://www.columbia.edu/~kermit/utf8.html @@ -68,7 +68,7 @@ func TestReaderEncodings(t *testing.T) { } // create line reader - reader, err := New(buffer, codec, 1024) + reader, err := NewLineReader(buffer, codec, 1024) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -159,7 +159,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) { // initialize reader buffer := bytes.NewBuffer(inputStream) codec, _ := encoding.Plain(buffer) - reader, err := New(buffer, codec, buffer.Len()) + reader, err := NewLineReader(buffer, codec, buffer.Len()) if err != nil { t.Fatalf("Error initializing reader: %v", err) } diff --git a/filebeat/reader/strip_newline/strip_newline.go b/filebeat/reader/readfile/strip_newline.go similarity index 96% rename from filebeat/reader/strip_newline/strip_newline.go rename to filebeat/reader/readfile/strip_newline.go index b62ba4381f6..3394cb9289e 100644 --- a/filebeat/reader/strip_newline/strip_newline.go +++ b/filebeat/reader/readfile/strip_newline.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package strip_newline +package readfile import ( "github.com/elastic/beats/filebeat/reader" @@ -28,7 +28,7 @@ type StripNewline struct { } // New creates a new line reader stripping the last tailing newline. -func New(r reader.Reader) *StripNewline { +func NewStripNewline(r reader.Reader) *StripNewline { return &StripNewline{r} } diff --git a/filebeat/reader/strip_newline/strip_newline_test.go b/filebeat/reader/readfile/strip_newline_test.go similarity index 98% rename from filebeat/reader/strip_newline/strip_newline_test.go rename to filebeat/reader/readfile/strip_newline_test.go index 8d18ea44e41..543056393e5 100644 --- a/filebeat/reader/strip_newline/strip_newline_test.go +++ b/filebeat/reader/readfile/strip_newline_test.go @@ -17,7 +17,7 @@ // +build !integration -package strip_newline +package readfile import ( "testing" diff --git a/filebeat/reader/timeout/timeout.go b/filebeat/reader/readfile/timeout.go similarity index 90% rename from filebeat/reader/timeout/timeout.go rename to filebeat/reader/readfile/timeout.go index 2e3fe3ef895..ea73bd60737 100644 --- a/filebeat/reader/timeout/timeout.go +++ b/filebeat/reader/readfile/timeout.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package timeout +package readfile import ( "errors" @@ -30,7 +30,7 @@ var ( // timeoutProcessor will signal some configurable timeout error if no // new line can be returned in time. -type Reader struct { +type TimeoutReader struct { reader reader.Reader timeout time.Duration signal error @@ -44,12 +44,12 @@ type lineMessage struct { } // New returns a new timeout reader from an input line reader. -func New(reader reader.Reader, signal error, t time.Duration) *Reader { +func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *TimeoutReader { if signal == nil { signal = errTimeout } - return &Reader{ + return &TimeoutReader{ reader: reader, signal: signal, timeout: t, @@ -62,7 +62,7 @@ func New(reader reader.Reader, signal error, t time.Duration) *Reader { // For handline timeouts a goroutine is started for reading lines from // configured line reader. Only when underlying reader returns an error, the // goroutine will be finished. -func (r *Reader) Next() (reader.Message, error) { +func (r *TimeoutReader) Next() (reader.Message, error) { if !r.running { r.running = true go func() { diff --git a/filebeat/scripts/tester/main.go b/filebeat/scripts/tester/main.go index 5e4538ba191..3300b8ef623 100644 --- a/filebeat/scripts/tester/main.go +++ b/filebeat/scripts/tester/main.go @@ -30,11 +30,9 @@ import ( "time" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/encode" - "github.com/elastic/beats/filebeat/reader/encode/encoding" - "github.com/elastic/beats/filebeat/reader/limit" "github.com/elastic/beats/filebeat/reader/multiline" - "github.com/elastic/beats/filebeat/reader/strip_newline" + "github.com/elastic/beats/filebeat/reader/readfile" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" ) @@ -135,12 +133,12 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) { } var r reader.Reader - r, err = encode.New(f, enc, 4096) + r, err = readfile.NewEncodeReader(f, enc, 4096) if err != nil { return nil, err } - r = strip_newline.New(r) + r = readfile.NewStripNewline(r) if conf.multiPattern != "" { p, err := match.Compile(conf.multiPattern) @@ -158,7 +156,7 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) { return nil, err } } - r = limit.New(r, conf.maxBytes) + r = readfile.NewLimitReader(r, conf.maxBytes) var logs []string for {