Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use add_kubernetes_metadata IP & port matching in Packetbeat #5707

Merged
merged 2 commits into from
Nov 28, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

*Packetbeat*

- Configure good defaults for `add_kubernetes_metadata`. {pull}5707[5707]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add 2 change entries. One to "applies to all beats" that the IpProcessor is now available for all beats (also filebeat for example) and the other one as above that IpProcessor is set as default for PB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! I added a new entry


*Winlogbeat*

==== Deprecated
Expand Down
79 changes: 79 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
const (
ContainerIndexerName = "container"
PodNameIndexerName = "pod_name"
IPPortIndexerName = "ip_port"
)

// Indexer take known pods and generate all the metadata we need to enrich
Expand Down Expand Up @@ -248,3 +249,81 @@ func containerID(status PodContainerStatus) string {
}
return ""
}

// IPPortIndexer indexes pods based on all their host:port combinations
type IPPortIndexer struct {
genMeta GenMeta
}

// NewIPPortIndexer creates and returns a new indexer for pod IP & ports
func NewIPPortIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &IPPortIndexer{genMeta: genMeta}, nil
}

// GetMetadata returns metadata for the given pod, if it matches the index
func (h *IPPortIndexer) GetMetadata(pod *Pod) []MetadataIndex {
commonMeta := h.genMeta.GenerateMetaData(pod)
hostPorts := h.GetIndexes(pod)
var metadata []MetadataIndex

if pod.Status.PodIP == "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a pod does not have an IP address, it means no metadata is availabe? Should this return an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wouldn't be dealing with this error anyway, it could make sense to log this, but I think it's just defensive code

return metadata
}
for i := 0; i < len(hostPorts); i++ {
dobreak := false
containerMeta := commonMeta.Clone()
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort == int64(0) {
continue
}
if strings.Index(hostPorts[i], fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)) != -1 {
containerMeta["container"] = common.MapStr{
"name": container.Name,
}
dobreak = true
break
}
}

if dobreak {
break
}

}

metadata = append(metadata, MetadataIndex{
Index: hostPorts[i],
Data: containerMeta,
})
}

return metadata
}

// GetIndexes returns the indexes for the given Pod
func (h *IPPortIndexer) GetIndexes(pod *Pod) []string {
var hostPorts []string

ip := pod.Status.PodIP
if ip == "" {
return hostPorts
}
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort != int64(0) {
hostPorts = append(hostPorts, fmt.Sprintf("%s:%d", ip, port.ContainerPort))
} else {
hostPorts = append(hostPorts, ip)
}

}

}

return hostPorts
}
66 changes: 66 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/indexers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,69 @@ func TestFilteredGenMetaExclusion(t *testing.T) {
ok, _ = labelMap.HasKey("x")
assert.Equal(t, ok, false)
}

func TestIpPortIndexer(t *testing.T) {
var testConfig = common.NewConfig()

ipIndexer, err := NewIPPortIndexer(*testConfig, metagen)
assert.Nil(t, err)

podName := "testpod"
ns := "testns"
container := "container"
ip := "1.2.3.4"
port := int64(80)
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"labelkey": "labelvalue",
},
},
Spec: PodSpec{
Containers: make([]Container, 0),
},

Status: PodStatus{
PodIP: ip,
},
}

indexers := ipIndexer.GetMetadata(&pod)
indices := ipIndexer.GetIndexes(&pod)
assert.Equal(t, len(indexers), 0)
assert.Equal(t, len(indices), 0)
expected := common.MapStr{
"pod": common.MapStr{
"name": "testpod",
},
"namespace": "testns",
"labels": common.MapStr{
"labelkey": "labelvalue",
},
}

pod.Spec.Containers = []Container{
{
Name: container,
Ports: []ContainerPort{
{
Name: container,
ContainerPort: port,
},
},
},
}
expected["container"] = common.MapStr{"name": container}

indexers = ipIndexer.GetMetadata(&pod)
assert.Equal(t, len(indexers), 1)
assert.Equal(t, indexers[0].Index, fmt.Sprintf("%s:%d", ip, port))

indices = ipIndexer.GetIndexes(&pod)
assert.Equal(t, len(indices), 1)
assert.Equal(t, indices[0], fmt.Sprintf("%s:%d", ip, port))

assert.Equal(t, expected.String(), indexers[0].Data.String())
}
1 change: 1 addition & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func init() {
// Register default indexers
Indexing.AddIndexer(PodNameIndexerName, NewPodNameIndexer)
Indexing.AddIndexer(ContainerIndexerName, NewContainerIndexer)
Indexing.AddIndexer(IPPortIndexerName, NewIPPortIndexer)
Indexing.AddMatcher(FieldMatcherName, NewFieldMatcher)
Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher)
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"

// Add metricbeat specific processors
// Add metricbeat default processors
_ "github.com/elastic/beats/metricbeat/processor/add_kubernetes_metadata"
)

Expand Down
85 changes: 1 addition & 84 deletions metricbeat/processor/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
package add_kubernetes_metadata

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
kubernetes "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata"
)

const (
IpPortIndexerName = "ip_port"
)

func init() {
// Register default indexers
kubernetes.Indexing.AddIndexer(IpPortIndexerName, NewIpPortIndexer)
cfg := common.NewConfig()

//Add IP Port Indexer as a default indexer
kubernetes.Indexing.AddDefaultIndexerConfig(IpPortIndexerName, *cfg)
kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.IPPortIndexerName, *cfg)

config := map[string]interface{}{
"lookup_fields": []string{"metricset.host"},
Expand All @@ -29,78 +21,3 @@ func init() {
kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldMatcherName, *fieldCfg)
}
}

// IpPortIndexer indexes pods based on all their host:port combinations
type IpPortIndexer struct {
genMeta kubernetes.GenMeta
}

func NewIpPortIndexer(_ common.Config, genMeta kubernetes.GenMeta) (kubernetes.Indexer, error) {
return &IpPortIndexer{genMeta: genMeta}, nil
}

func (h *IpPortIndexer) GetMetadata(pod *kubernetes.Pod) []kubernetes.MetadataIndex {
commonMeta := h.genMeta.GenerateMetaData(pod)
hostPorts := h.GetIndexes(pod)
var metadata []kubernetes.MetadataIndex

if pod.Status.PodIP == "" {
return metadata
}
for i := 0; i < len(hostPorts); i++ {
dobreak := false
containerMeta := commonMeta.Clone()
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort == int64(0) {
continue
}
if strings.Index(hostPorts[i], fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)) != -1 {
containerMeta["container"] = common.MapStr{
"name": container.Name,
}
dobreak = true
break
}
}

if dobreak {
break
}

}

metadata = append(metadata, kubernetes.MetadataIndex{
Index: hostPorts[i],
Data: containerMeta,
})
}

return metadata
}

func (h *IpPortIndexer) GetIndexes(pod *kubernetes.Pod) []string {
var hostPorts []string

ip := pod.Status.PodIP
if ip == "" {
return hostPorts
}
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort != int64(0) {
hostPorts = append(hostPorts, fmt.Sprintf("%s:%d", ip, port.ContainerPort))
} else {
hostPorts = append(hostPorts, ip)
}

}

}

return hostPorts
}
79 changes: 0 additions & 79 deletions metricbeat/processor/add_kubernetes_metadata/indexers_test.go

This file was deleted.

3 changes: 3 additions & 0 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"github.com/elastic/beats/packetbeat/protos/udp"
"github.com/elastic/beats/packetbeat/publish"
"github.com/elastic/beats/packetbeat/sniffer"

// Add packetbeat default processors
_ "github.com/elastic/beats/packetbeat/processor/add_kubernetes_metadata"
)

// Beater object. Contains all objects needed to run the beat
Expand Down
Loading