From a9900168b06ce2390b3734a3a98ab1e2967bf39f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Fri, 24 Nov 2017 15:53:23 +0100 Subject: [PATCH 1/2] Use `add_kubernetes_metadata` IP & port matching in Packetbeat The code was there for Metricbeat already, this PR moves it to libbeat and adapts Packetbeat to include it --- CHANGELOG.asciidoc | 2 + .../add_kubernetes_metadata/indexers.go | 79 +++++++++++++++++ .../add_kubernetes_metadata/indexers_test.go | 66 ++++++++++++++ .../add_kubernetes_metadata/kubernetes.go | 1 + metricbeat/beater/metricbeat.go | 2 +- .../add_kubernetes_metadata/indexers.go | 85 +------------------ .../add_kubernetes_metadata/indexers_test.go | 79 ----------------- packetbeat/beater/packetbeat.go | 3 + .../add_kubernetes_metadata/indexers.go | 22 +++++ 9 files changed, 175 insertions(+), 164 deletions(-) delete mode 100644 metricbeat/processor/add_kubernetes_metadata/indexers_test.go create mode 100644 packetbeat/processor/add_kubernetes_metadata/indexers.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 90c58e593fd..a34f5d77134 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -122,6 +122,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di *Packetbeat* +- Configure good defaults for `add_kubernetes_metadata`. {pull}5707[5707] + *Winlogbeat* ==== Deprecated diff --git a/libbeat/processors/add_kubernetes_metadata/indexers.go b/libbeat/processors/add_kubernetes_metadata/indexers.go index 8976e637350..b23df9016b7 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers.go @@ -12,6 +12,7 @@ import ( const ( ContainerIndexerName = "container" PodNameIndexerName = "pod_name" + IPPortIndexerName = "ip_port" ) // Indexer take known pods and generate all the metadata we need to enrich @@ -248,3 +249,81 @@ func containerID(status PodContainerStatus) string { } return "" } + +// IPPortIndexer indexes pods based on all their host:port combinations +type IPPortIndexer struct { + genMeta GenMeta +} + +// NewIPPortIndexer creates and returns a new indexer for pod IP & ports +func NewIPPortIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) { + return &IPPortIndexer{genMeta: genMeta}, nil +} + +// GetMetadata returns metadata for the given pod, if it matches the index +func (h *IPPortIndexer) GetMetadata(pod *Pod) []MetadataIndex { + commonMeta := h.genMeta.GenerateMetaData(pod) + hostPorts := h.GetIndexes(pod) + var metadata []MetadataIndex + + if pod.Status.PodIP == "" { + return metadata + } + for i := 0; i < len(hostPorts); i++ { + dobreak := false + containerMeta := commonMeta.Clone() + for _, container := range pod.Spec.Containers { + ports := container.Ports + + for _, port := range ports { + if port.ContainerPort == int64(0) { + continue + } + if strings.Index(hostPorts[i], fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)) != -1 { + containerMeta["container"] = common.MapStr{ + "name": container.Name, + } + dobreak = true + break + } + } + + if dobreak { + break + } + + } + + metadata = append(metadata, MetadataIndex{ + Index: hostPorts[i], + Data: containerMeta, + }) + } + + return metadata +} + +// GetIndexes returns the indexes for the given Pod +func (h *IPPortIndexer) GetIndexes(pod *Pod) []string { + var hostPorts []string + + ip := pod.Status.PodIP + if ip == "" { + return hostPorts + } + for _, container := range pod.Spec.Containers { + ports := container.Ports + + for _, port := range ports { + if port.ContainerPort != int64(0) { + hostPorts = append(hostPorts, fmt.Sprintf("%s:%d", ip, port.ContainerPort)) + } else { + hostPorts = append(hostPorts, ip) + } + + } + + } + + return hostPorts +} diff --git a/libbeat/processors/add_kubernetes_metadata/indexers_test.go b/libbeat/processors/add_kubernetes_metadata/indexers_test.go index 16edcedf64a..bc6a6378c94 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers_test.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers_test.go @@ -237,3 +237,69 @@ func TestFilteredGenMetaExclusion(t *testing.T) { ok, _ = labelMap.HasKey("x") assert.Equal(t, ok, false) } + +func TestIpPortIndexer(t *testing.T) { + var testConfig = common.NewConfig() + + ipIndexer, err := NewIPPortIndexer(*testConfig, metagen) + assert.Nil(t, err) + + podName := "testpod" + ns := "testns" + container := "container" + ip := "1.2.3.4" + port := int64(80) + pod := Pod{ + Metadata: ObjectMeta{ + Name: podName, + Namespace: ns, + Labels: map[string]string{ + "labelkey": "labelvalue", + }, + }, + Spec: PodSpec{ + Containers: make([]Container, 0), + }, + + Status: PodStatus{ + PodIP: ip, + }, + } + + indexers := ipIndexer.GetMetadata(&pod) + indices := ipIndexer.GetIndexes(&pod) + assert.Equal(t, len(indexers), 0) + assert.Equal(t, len(indices), 0) + expected := common.MapStr{ + "pod": common.MapStr{ + "name": "testpod", + }, + "namespace": "testns", + "labels": common.MapStr{ + "labelkey": "labelvalue", + }, + } + + pod.Spec.Containers = []Container{ + { + Name: container, + Ports: []ContainerPort{ + { + Name: container, + ContainerPort: port, + }, + }, + }, + } + expected["container"] = common.MapStr{"name": container} + + indexers = ipIndexer.GetMetadata(&pod) + assert.Equal(t, len(indexers), 1) + assert.Equal(t, indexers[0].Index, fmt.Sprintf("%s:%d", ip, port)) + + indices = ipIndexer.GetIndexes(&pod) + assert.Equal(t, len(indices), 1) + assert.Equal(t, indices[0], fmt.Sprintf("%s:%d", ip, port)) + + assert.Equal(t, expected.String(), indexers[0].Data.String()) +} diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 5780aa99cee..4fa4384efae 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -37,6 +37,7 @@ func init() { // Register default indexers Indexing.AddIndexer(PodNameIndexerName, NewPodNameIndexer) Indexing.AddIndexer(ContainerIndexerName, NewContainerIndexer) + Indexing.AddIndexer(IPPortIndexerName, NewIPPortIndexer) Indexing.AddMatcher(FieldMatcherName, NewFieldMatcher) Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher) } diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 84f4ab3b7ae..f8b3d7795d5 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" - // Add metricbeat specific processors + // Add metricbeat default processors _ "github.com/elastic/beats/metricbeat/processor/add_kubernetes_metadata" ) diff --git a/metricbeat/processor/add_kubernetes_metadata/indexers.go b/metricbeat/processor/add_kubernetes_metadata/indexers.go index f896d14a6c2..9b01cfab901 100644 --- a/metricbeat/processor/add_kubernetes_metadata/indexers.go +++ b/metricbeat/processor/add_kubernetes_metadata/indexers.go @@ -1,24 +1,16 @@ package add_kubernetes_metadata import ( - "fmt" - "strings" - "github.com/elastic/beats/libbeat/common" kubernetes "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata" ) -const ( - IpPortIndexerName = "ip_port" -) - func init() { // Register default indexers - kubernetes.Indexing.AddIndexer(IpPortIndexerName, NewIpPortIndexer) cfg := common.NewConfig() //Add IP Port Indexer as a default indexer - kubernetes.Indexing.AddDefaultIndexerConfig(IpPortIndexerName, *cfg) + kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.IPPortIndexerName, *cfg) config := map[string]interface{}{ "lookup_fields": []string{"metricset.host"}, @@ -29,78 +21,3 @@ func init() { kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldMatcherName, *fieldCfg) } } - -// IpPortIndexer indexes pods based on all their host:port combinations -type IpPortIndexer struct { - genMeta kubernetes.GenMeta -} - -func NewIpPortIndexer(_ common.Config, genMeta kubernetes.GenMeta) (kubernetes.Indexer, error) { - return &IpPortIndexer{genMeta: genMeta}, nil -} - -func (h *IpPortIndexer) GetMetadata(pod *kubernetes.Pod) []kubernetes.MetadataIndex { - commonMeta := h.genMeta.GenerateMetaData(pod) - hostPorts := h.GetIndexes(pod) - var metadata []kubernetes.MetadataIndex - - if pod.Status.PodIP == "" { - return metadata - } - for i := 0; i < len(hostPorts); i++ { - dobreak := false - containerMeta := commonMeta.Clone() - for _, container := range pod.Spec.Containers { - ports := container.Ports - - for _, port := range ports { - if port.ContainerPort == int64(0) { - continue - } - if strings.Index(hostPorts[i], fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)) != -1 { - containerMeta["container"] = common.MapStr{ - "name": container.Name, - } - dobreak = true - break - } - } - - if dobreak { - break - } - - } - - metadata = append(metadata, kubernetes.MetadataIndex{ - Index: hostPorts[i], - Data: containerMeta, - }) - } - - return metadata -} - -func (h *IpPortIndexer) GetIndexes(pod *kubernetes.Pod) []string { - var hostPorts []string - - ip := pod.Status.PodIP - if ip == "" { - return hostPorts - } - for _, container := range pod.Spec.Containers { - ports := container.Ports - - for _, port := range ports { - if port.ContainerPort != int64(0) { - hostPorts = append(hostPorts, fmt.Sprintf("%s:%d", ip, port.ContainerPort)) - } else { - hostPorts = append(hostPorts, ip) - } - - } - - } - - return hostPorts -} diff --git a/metricbeat/processor/add_kubernetes_metadata/indexers_test.go b/metricbeat/processor/add_kubernetes_metadata/indexers_test.go deleted file mode 100644 index 86e6003b9c1..00000000000 --- a/metricbeat/processor/add_kubernetes_metadata/indexers_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package add_kubernetes_metadata - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata" -) - -var metagen = &add_kubernetes_metadata.GenDefaultMeta{} - -func TestIpPortIndexer(t *testing.T) { - var testConfig = common.NewConfig() - - ipIndexer, err := NewIpPortIndexer(*testConfig, metagen) - assert.Nil(t, err) - - podName := "testpod" - ns := "testns" - container := "container" - ip := "1.2.3.4" - port := int64(80) - pod := add_kubernetes_metadata.Pod{ - Metadata: add_kubernetes_metadata.ObjectMeta{ - Name: podName, - Namespace: ns, - Labels: map[string]string{ - "labelkey": "labelvalue", - }, - }, - Spec: add_kubernetes_metadata.PodSpec{ - Containers: make([]add_kubernetes_metadata.Container, 0), - }, - - Status: add_kubernetes_metadata.PodStatus{ - PodIP: ip, - }, - } - - indexers := ipIndexer.GetMetadata(&pod) - indices := ipIndexer.GetIndexes(&pod) - assert.Equal(t, len(indexers), 0) - assert.Equal(t, len(indices), 0) - expected := common.MapStr{ - "pod": common.MapStr{ - "name": "testpod", - }, - "namespace": "testns", - "labels": common.MapStr{ - "labelkey": "labelvalue", - }, - } - - pod.Spec.Containers = []add_kubernetes_metadata.Container{ - { - Name: container, - Ports: []add_kubernetes_metadata.ContainerPort{ - { - Name: container, - ContainerPort: port, - }, - }, - }, - } - expected["container"] = common.MapStr{"name": container} - - indexers = ipIndexer.GetMetadata(&pod) - assert.Equal(t, len(indexers), 1) - assert.Equal(t, indexers[0].Index, fmt.Sprintf("%s:%d", ip, port)) - - indices = ipIndexer.GetIndexes(&pod) - assert.Equal(t, len(indices), 1) - assert.Equal(t, indices[0], fmt.Sprintf("%s:%d", ip, port)) - - assert.Equal(t, expected.String(), indexers[0].Data.String()) -} diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 6e0d265f391..e4f4734f690 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -25,6 +25,9 @@ import ( "github.com/elastic/beats/packetbeat/protos/udp" "github.com/elastic/beats/packetbeat/publish" "github.com/elastic/beats/packetbeat/sniffer" + + // Add packetbeat default processors + _ "github.com/elastic/beats/packetbeat/processor/add_kubernetes_metadata" ) // Beater object. Contains all objects needed to run the beat diff --git a/packetbeat/processor/add_kubernetes_metadata/indexers.go b/packetbeat/processor/add_kubernetes_metadata/indexers.go new file mode 100644 index 00000000000..3496d2f5c74 --- /dev/null +++ b/packetbeat/processor/add_kubernetes_metadata/indexers.go @@ -0,0 +1,22 @@ +package add_kubernetes_metadata + +import ( + "github.com/elastic/beats/libbeat/common" + kubernetes "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata" +) + +func init() { + // Register default indexers + cfg := common.NewConfig() + + //Add IP Port Indexer as a default indexer + kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.IPPortIndexerName, *cfg) + + formatCfg, err := common.NewConfigFrom(map[string]interface{}{ + "format": "%{[ip]}:%{[port]}", + }) + if err == nil { + //Add field matcher with field to lookup as metricset.host + kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldFormatMatcherName, *formatCfg) + } +} From 130621cdcc5aa76088c2ed3b5595875181e70b18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 27 Nov 2017 12:27:39 +0100 Subject: [PATCH 2/2] Update CHANGELOG --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a34f5d77134..d3a199dd4f4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -86,6 +86,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Update to Golang 1.9.2 - Add number_of_routing_shards config set to 30 {pull}5570[5570] - Set log level for kafka output. {pull}5397[5397] +- Moved `ip_port` indexer for `add_kubernetes_metadata` to all beats. {pull}5707[5707] *Auditbeat*