Skip to content

Commit

Permalink
add_kubernetes_metadata processor: add support for "/var/log/containe…
Browse files Browse the repository at this point in the history
…rs/" log path (#4981)

* add_kubernetes_metadata processor supports '/var/log/containers/' log path

The add_kubernetes_metadata processor's LogPathMatcher could extract
a Docker container ID - and hence enrich a log document with Kubernetes
metadata - only if the log path was '/var/lib/docker/containers/'.

With this commit, the LogPathMatcher can extract the container ID also
from a '/var/log/containers/' log path (Kubernetes symlinks).
  • Loading branch information
SvenWoltmann authored and exekias committed Aug 25, 2017
1 parent 5657197 commit 4ac68d2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di

- Add PostgreSQL module with slowlog support. {pull}4763[4763]
- Add Kafka log module. {pull}4885[4885]
- Add support for `/var/log/containers/` log path in `add_kubernetes_metadata` processor. {pull}4981[4981]

*Heartbeat*

Expand Down
36 changes: 29 additions & 7 deletions filebeat/processor/add_kubernetes_metadata/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,45 @@ func newLogsPathMatcher(cfg common.Config) (add_kubernetes_metadata.Matcher, err
logPath = logPath + "/"
}

logp.Debug("kubernetes", "logs_path matcher log path: %s", logPath)

return &LogPathMatcher{LogsPath: logPath}, nil
}

// Docker container ID is a 64-character-long hexadecimal string
const containerIdLen = 64

func (f *LogPathMatcher) MetadataIndex(event common.MapStr) string {
if value, ok := event["source"]; ok {
source := value.(string)
logp.Debug("kubernetes", "Incoming source value: ", source)
cid := ""
if strings.Contains(source, f.LogsPath) {
//Docker container is 64 chars in length
cid = source[len(f.LogsPath) : len(f.LogsPath)+64]
logp.Debug("kubernetes", "Incoming source value: %s", source)

if !strings.Contains(source, f.LogsPath) {
logp.Debug("kubernetes", "Error extracting container id - source value does not contain matcher's logs_path '%s'.", f.LogsPath)
return ""
}

sourceLen := len(source)
logsPathLen := len(f.LogsPath)

// In case of the Kubernetes log path "/var/log/containers/",
// the container ID will be located right before the ".log" extension.
if strings.HasPrefix(f.LogsPath, "/var/log/containers/") && strings.HasSuffix(source, ".log") && sourceLen >= containerIdLen+4 {
containerIdEnd := sourceLen - 4
cid := source[containerIdEnd-containerIdLen : containerIdEnd]
logp.Debug("kubernetes", "Using container id: %s", cid)
return cid
}
logp.Debug("kubernetes", "Using container id: ", cid)

if cid != "" {
// In any other case, we assume the container ID will follow right after the log path.
// However we need to check the length to prevent "slice bound out of range" runtime errors.
if sourceLen >= logsPathLen+containerIdLen {
cid := source[logsPathLen : logsPathLen+containerIdLen]
logp.Debug("kubernetes", "Using container id: %s", cid)
return cid
}

logp.Debug("kubernetes", "Error extracting container id - source value contains matcher's logs_path, however it is too short to contain a Docker container ID.")
}

return ""
Expand Down
63 changes: 51 additions & 12 deletions filebeat/processor/add_kubernetes_metadata/indexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,63 @@ import (
"github.com/elastic/beats/libbeat/common"
)

func TestLogsPathMatcher(t *testing.T) {
// A random container ID that we use for our tests
const cid = "0069869de9adf97f574c62029aeba65d1ecd85a2a112e87fbc28afe4dec2b843"

func TestLogsPathMatcher_InvalidSource1(t *testing.T) {
cfgLogsPath := "" // use the default matcher configuration
source := "/var/log/messages"
expectedResult := ""
executeTest(t, cfgLogsPath, source, expectedResult)
}

func TestLogsPathMatcher_InvalidSource2(t *testing.T) {
cfgLogsPath := "" // use the default matcher configuration
source := "/var/lib/docker/containers/01234567/89abcdef-json.log"
expectedResult := ""
executeTest(t, cfgLogsPath, source, expectedResult)
}

func TestLogsPathMatcher_InvalidSource3(t *testing.T) {
cfgLogsPath := "/var/log/containers/"
source := "/var/log/containers/pod_ns_container_01234567.log"
expectedResult := ""
executeTest(t, cfgLogsPath, source, expectedResult)
}

func TestLogsPathMatcher_VarLibDockerContainers(t *testing.T) {
cfgLogsPath := "" // use the default matcher configuration
source := fmt.Sprintf("/var/lib/docker/containers/%s/%s-json.log", cid, cid)
expectedResult := cid
executeTest(t, cfgLogsPath, source, expectedResult)
}

func TestLogsPathMatcher_VarLogContainers(t *testing.T) {
cfgLogsPath := "/var/log/containers/"
source := fmt.Sprintf("/var/log/containers/kube-proxy-4d7nt_kube-system_kube-proxy-%s.log", cid)
expectedResult := cid
executeTest(t, cfgLogsPath, source, expectedResult)
}

func TestLogsPathMatcher_AnotherLogDir(t *testing.T) {
cfgLogsPath := "/var/log/other/"
source := fmt.Sprintf("/var/log/other/%s.log", cid)
expectedResult := cid
executeTest(t, cfgLogsPath, source, expectedResult)
}

func executeTest(t *testing.T, cfgLogsPath string, source string, expectedResult string) {
var testConfig = common.NewConfig()
if cfgLogsPath != "" {
testConfig.SetString("logs_path", -1, cfgLogsPath)
}

logMatcher, err := newLogsPathMatcher(*testConfig)
assert.Nil(t, err)

cid := "0069869de9adf97f574c62029aeba65d1ecd85a2a112e87fbc28afe4dec2b843"
logPath := fmt.Sprintf("/var/lib/docker/containers/%s/%s-json.log", cid, cid)

input := common.MapStr{
"source": "/var/log/messages",
"source": source,
}

output := logMatcher.MetadataIndex(input)
assert.Equal(t, output, "")

input["source"] = logPath
output = logMatcher.MetadataIndex(input)

assert.Equal(t, output, cid)
assert.Equal(t, output, expectedResult)
}

0 comments on commit 4ac68d2

Please sign in to comment.