Skip to content

Commit

Permalink
Reduce casting between []byte and string in CRI log processing (elast…
Browse files Browse the repository at this point in the history
…ic#8424)

* Reduce casting between []byte and string in CRI log processing

* Refactor CRI and json-file detection

* Improve handling of new lines in linux/windows CRI logs.

* Move new line removal to OS specific functions.
* Add option to force CRI log parsing, otherwise autodetect log type.

* Fixed unit tests and new line removal

* Fixed new line removal
* Added unit tests to check Forced CRI flag

(cherry picked from commit 3f7d6a6)
  • Loading branch information
jareksm authored and Carlos Pérez-Aradros Herce committed Oct 23, 2018
1 parent b94753e commit 89cd72c
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 18 deletions.
1 change: 1 addition & 0 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type config struct {
DockerJSON *struct {
Stream string `config:"stream"`
Partial bool `config:"partial"`
ForceCRI bool `config:"force_cri_logs"`
CRIFlags bool `config:"cri_flags"`
} `config:"docker-json"`
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {

if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.CRIFlags)
r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.ForceCRI, h.config.DockerJSON.CRIFlags)
}

if h.config.JSON != nil {
Expand Down
32 changes: 19 additions & 13 deletions filebeat/reader/docker_json/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package docker_json
import (
"bytes"
"encoding/json"
"strings"
"time"

"github.com/elastic/beats/filebeat/reader"
Expand All @@ -38,6 +37,9 @@ type Reader struct {
// join partial lines
partial bool

// Force log format: json-file | cri
forceCRI bool

// parse CRI flags
criflags bool
}
Expand All @@ -51,11 +53,12 @@ type logLine struct {
}

// New creates a new reader renaming a field
func New(r reader.Reader, stream string, partial bool, CRIFlags bool) *Reader {
func New(r reader.Reader, stream string, partial bool, forceCRI bool, CRIFlags bool) *Reader {
return &Reader{
stream: stream,
partial: partial,
reader: r,
forceCRI: forceCRI,
criflags: CRIFlags,
}
}
Expand All @@ -74,28 +77,28 @@ func (p *Reader) parseCRILog(message *reader.Message, msg *logLine) error {
i := 0

// timestamp
log := strings.SplitN(string(message.Content), " ", split)
log := bytes.SplitN(message.Content, []byte{' '}, split)
if len(log) < split {
return errors.New("invalid CRI log format")
}
ts, err := time.Parse(time.RFC3339, log[i])
ts, err := time.Parse(time.RFC3339, string(log[i]))
if err != nil {
return errors.Wrap(err, "parsing CRI timestamp")
}
message.Ts = ts
i++

// stream
msg.Stream = log[i]
msg.Stream = string(log[i])
i++

// tags
partial := false
if p.criflags {
// currently only P(artial) or F(ull) are available
tags := strings.Split(log[i], ":")
tags := bytes.Split(log[i], []byte{':'})
for _, tag := range tags {
if tag == "P" {
if len(tag) == 1 && tag[0] == 'P' {
partial = true
}
}
Expand All @@ -106,12 +109,10 @@ func (p *Reader) parseCRILog(message *reader.Message, msg *logLine) error {
message.AddFields(common.MapStr{
"stream": msg.Stream,
})
// Remove ending \n for partial messages
message.Content = []byte(log[i])
// Remove \n ending for partial messages
message.Content = log[i]
if partial {
message.Content = bytes.TrimRightFunc(message.Content, func(r rune) bool {
return r == '\n' || r == '\r'
})
stripNewLine(message)
}

return nil
Expand Down Expand Up @@ -144,7 +145,12 @@ func (p *Reader) parseDockerJSONLog(message *reader.Message, msg *logLine) error
}

func (p *Reader) parseLine(message *reader.Message, msg *logLine) error {
if strings.HasPrefix(string(message.Content), "{") {
if p.forceCRI {
return p.parseCRILog(message, msg)
}

// If froceCRI isn't set, autodetect file type
if len(message.Content) > 0 && message.Content[0] == '{' {
return p.parseDockerJSONLog(message, msg)
}

Expand Down
76 changes: 72 additions & 4 deletions filebeat/reader/docker_json/docker_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestDockerJSON(t *testing.T) {
input [][]byte
stream string
partial bool
forceCRI bool
criflags bool
expectedError bool
expectedMessage reader.Message
Expand Down Expand Up @@ -143,19 +144,19 @@ func TestDockerJSON(t *testing.T) {
},
},
{
name: "Split lines",
name: "CRI Split lines",
input: [][]byte{
[]byte(`2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`),
[]byte(`2017-11-12T23:32:21.212771448Z stdout F error`),
},
stream: "stdout",
stream: "stdout",
partial: true,
expectedMessage: reader.Message{
Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC),
Bytes: 163,
},
partial: true,
criflags: true,
},
{
Expand Down Expand Up @@ -189,12 +190,79 @@ func TestDockerJSON(t *testing.T) {
Bytes: 109,
},
},
{
name: "Force CRI with JSON logs",
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
forceCRI: true,
expectedError: true,
},
{
name: "Force CRI log no tags",
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
stream: "all",
expectedMessage: reader.Message{
Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC),
Bytes: 115,
},
forceCRI: true,
criflags: false,
},
{
name: "Force CRI log with flags",
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
stream: "all",
expectedMessage: reader.Message{
Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC),
Bytes: 117,
},
forceCRI: true,
criflags: true,
},
{
name: "Force CRI split lines",
input: [][]byte{
[]byte(`2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`),
[]byte(`2017-11-12T23:32:21.212771448Z stdout F error`),
},
stream: "stdout",
partial: true,
expectedMessage: reader.Message{
Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC),
Bytes: 163,
},
forceCRI: true,
criflags: true,
},
{
name: "Force CRI split lines and remove \\n",
input: [][]byte{
[]byte("2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache\n"),
[]byte("2017-11-12T23:32:21.212771448Z stdout F error"),
},
stream: "stdout",
expectedMessage: reader.Message{
Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC),
Bytes: 164,
},
partial: true,
forceCRI: true,
criflags: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := &mockReader{messages: test.input}
json := New(r, test.stream, test.partial, test.criflags)
json := New(r, test.stream, test.partial, test.forceCRI, test.criflags)
message, err := json.Next()

if test.expectedError {
Expand Down
30 changes: 30 additions & 0 deletions libbeat/reader/readjson/docker_json_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// +build linux darwin

package readjson

import (
"github.com/elastic/beats/libbeat/reader"
)

func stripNewLine(msg *reader.Message) {
l := len(msg.Content)
if l > 0 && msg.Content[l-1] == '\n' {
msg.Content = msg.Content[:l-1]
}
}
30 changes: 30 additions & 0 deletions libbeat/reader/readjson/docker_json_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package readjson

import (
"bytes"

"github.com/elastic/beats/libbeat/reader"
)

func stripNewLine(msg *reader.Message) {
msg.Content = bytes.TrimRightFunc(msg.Content, func(r rune) bool {
return r == '\n' || r == '\r'
})
}

0 comments on commit 89cd72c

Please sign in to comment.