From b32fa7df4450579c7e7b1d4f3cbc04ee94844f57 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 4 Mar 2019 13:48:40 +0100 Subject: [PATCH] Ensure kubernetes caches don't expire if they are being read (#10946) Some metrics in metricbeat kubernetes module are cached during a time, if they are not updated they are removed. But it is usual to have pods or containers that are not updated during more time that the expiration cache. Current implementation was not renovating expiration times for cache entries so all were eventually removed if updates for them are not received. Replace it with the cache implementation available in libbeat, but keeping the existing interface. Also, use slashes instead of dashes to generate unique container uids. Dashes can be used by kubernetes names, what could lead to ambiguous keys for the caches. Fix #10658 (cherry picked from commit 106df3d5cc5d2b66130346c2c6188e3cb48635c7) --- CHANGELOG.next.asciidoc | 1 + .../module/kubernetes/util/metrics_cache.go | 95 +++++++++---------- .../kubernetes/util/metrics_cache_test.go | 37 +------- 3 files changed, 46 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 11f916ddfce..e0e5e6912ab 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -62,6 +62,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di *Metricbeat* - Migrate docker autodiscover to ECS. {issue}10757[10757] {pull}10862[10862] +- Fix issue in kubernetes module preventing usage percentages to be properly calculated. {pull}10946[10946] *Packetbeat* diff --git a/metricbeat/module/kubernetes/util/metrics_cache.go b/metricbeat/module/kubernetes/util/metrics_cache.go index ffa8f235ed5..7ffff06edfe 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache.go +++ b/metricbeat/module/kubernetes/util/metrics_cache.go @@ -18,17 +18,19 @@ package util import ( - "sync" "time" + + "github.com/elastic/beats/libbeat/common" ) // PerfMetrics stores known metrics from Kubernetes nodes and containers var PerfMetrics = NewPerfMetricsCache() -const defaultTimeout = 120 * time.Second +func init() { + PerfMetrics.Start() +} -var now = time.Now -var sleep = time.Sleep +const defaultTimeout = 120 * time.Second // NewPerfMetricsCache initializes and returns a new PerfMetricsCache func NewPerfMetricsCache() *PerfMetricsCache { @@ -43,7 +45,6 @@ func NewPerfMetricsCache() *PerfMetricsCache { // PerfMetricsCache stores known metrics from Kubernetes nodes and containers type PerfMetricsCache struct { - mutex sync.RWMutex NodeMemAllocatable *valueMap NodeCoresAllocatable *valueMap @@ -51,72 +52,64 @@ type PerfMetricsCache struct { ContainerCoresLimit *valueMap } -func newValueMap(timeout time.Duration) *valueMap { - return &valueMap{ - values: map[string]value{}, - timeout: timeout, - } +// Start cache workers +func (c *PerfMetricsCache) Start() { + c.NodeMemAllocatable.Start() + c.NodeCoresAllocatable.Start() + c.ContainerMemLimit.Start() + c.ContainerCoresLimit.Start() } -type valueMap struct { - sync.RWMutex - running bool - timeout time.Duration - values map[string]value +// Stop cache workers +func (c *PerfMetricsCache) Stop() { + c.NodeMemAllocatable.Stop() + c.NodeCoresAllocatable.Stop() + c.ContainerMemLimit.Stop() + c.ContainerCoresLimit.Stop() } -type value struct { - value float64 - expires int64 +type valueMap struct { + cache *common.Cache + timeout time.Duration } -// ContainerUID creates an unique ID for from namespace, pod name and container name -func ContainerUID(namespace, pod, container string) string { - return namespace + "-" + pod + "-" + container +func newValueMap(timeout time.Duration) *valueMap { + return &valueMap{ + cache: common.NewCache(timeout, 0), + timeout: timeout, + } } // Get value func (m *valueMap) Get(name string) float64 { - m.RLock() - defer m.RUnlock() - return m.values[name].value + return m.GetWithDefault(name, 0.0) } // Get value func (m *valueMap) GetWithDefault(name string, def float64) float64 { - m.RLock() - defer m.RUnlock() - val, ok := m.values[name] - if ok { - return val.value + v := m.cache.Get(name) + if v, ok := v.(float64); ok { + return v } return def } // Set value func (m *valueMap) Set(name string, val float64) { - m.Lock() - defer m.Unlock() - m.ensureCleanupWorker() - m.values[name] = value{val, now().Add(m.timeout).Unix()} + m.cache.PutWithTimeout(name, val, m.timeout) } -func (m *valueMap) ensureCleanupWorker() { - if !m.running { - // Run worker to cleanup expired entries - m.running = true - go func() { - for { - sleep(m.timeout) - m.Lock() - now := now().Unix() - for name, val := range m.values { - if now > val.expires { - delete(m.values, name) - } - } - m.Unlock() - } - }() - } +// Start cache workers +func (m *valueMap) Start() { + m.cache.StartJanitor(m.timeout) +} + +// Stop cache workers +func (m *valueMap) Stop() { + m.cache.StopJanitor() +} + +// ContainerUID creates an unique ID for from namespace, pod name and container name +func ContainerUID(namespace, pod, container string) string { + return namespace + "/" + pod + "/" + container } diff --git a/metricbeat/module/kubernetes/util/metrics_cache_test.go b/metricbeat/module/kubernetes/util/metrics_cache_test.go index d5ce7bd2bb8..649c1f5fb86 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache_test.go +++ b/metricbeat/module/kubernetes/util/metrics_cache_test.go @@ -19,45 +19,10 @@ package util import ( "testing" - "time" "github.com/stretchr/testify/assert" ) -func TestTimeout(t *testing.T) { - // Mock monotonic time: - fakeTimeCh := make(chan int64) - go func() { - fakeTime := time.Now().Unix() - for { - fakeTime++ - fakeTimeCh <- fakeTime - } - }() - - now = func() time.Time { - return time.Unix(<-fakeTimeCh, 0) - } - - // Blocking sleep: - sleepCh := make(chan struct{}) - sleep = func(time.Duration) { - <-sleepCh - } - - test := newValueMap(1 * time.Second) - - test.Set("foo", 3.14) - - // Let cleanup do its job - sleepCh <- struct{}{} - sleepCh <- struct{}{} - sleepCh <- struct{}{} - - // Check it expired - assert.Equal(t, 0.0, test.Get("foo")) -} - func TestValueMap(t *testing.T) { test := newValueMap(defaultTimeout) @@ -82,5 +47,5 @@ func TestGetWithDefault(t *testing.T) { } func TestContainerUID(t *testing.T) { - assert.Equal(t, "a-b-c", ContainerUID("a", "b", "c")) + assert.Equal(t, "a/b/c", ContainerUID("a", "b", "c")) }