From 7494e24752fd5a82a4005a717f6a71f1b67bf9e1 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Wed, 17 Jun 2020 20:04:12 +0200 Subject: [PATCH 01/10] Add tests for Cloud Foundry versions in metricbeat --- .../container/container_integration_test.go | 14 ++++++++++++++ .../counter/counter_integration_test.go | 14 ++++++++++++++ .../cloudfoundry/value/value_integration_test.go | 14 ++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go index b05683b487e..e871a5823fc 100644 --- a/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "container") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(60*time.Second, 1, ms) diff --git a/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go index 6a87ce6f951..44cb4935e70 100644 --- a/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "counter") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go index 03d11bb6b7e..610a0a8e029 100644 --- a/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "value") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) From cb9d1cc71d82a9f27d19abeb62a6cbcee4cd7d80 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Wed, 17 Jun 2020 20:23:07 +0200 Subject: [PATCH 02/10] Initial support for multiple versions --- .../module/cloudfoundry/cloudfoundry.go | 126 ++--------------- .../cloudfoundry/container/container.go | 4 +- .../module/cloudfoundry/counter/counter.go | 4 +- x-pack/metricbeat/module/cloudfoundry/v1.go | 5 + x-pack/metricbeat/module/cloudfoundry/v2.go | 128 ++++++++++++++++++ .../module/cloudfoundry/value/value.go | 4 +- 6 files changed, 153 insertions(+), 118 deletions(-) create mode 100644 x-pack/metricbeat/module/cloudfoundry/v1.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/v2.go diff --git a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go index 96cded6fa7e..90b26762d43 100644 --- a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go +++ b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go @@ -5,8 +5,7 @@ package cloudfoundry import ( - "context" - "sync" + "fmt" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" @@ -16,26 +15,18 @@ import ( // ModuleName is the name of this module. const ModuleName = "cloudfoundry" -type Module struct { - mb.BaseModule - - log *logp.Logger - - hub *cfcommon.Hub - listener *cfcommon.RlpListener - listenerLock sync.Mutex - - counterReporter mb.PushReporterV2 - valueReporter mb.PushReporterV2 - containerReporter mb.PushReporterV2 -} - func init() { if err := mb.Registry.AddModule(ModuleName, newModule); err != nil { panic(err) } } +type Module interface { + RunCounterReporter(mb.PushReporterV2) + RunContainerReporter(mb.PushReporterV2) + RunValueReporter(mb.PushReporterV2) +} + func newModule(base mb.BaseModule) (mb.Module, error) { var cfg cfcommon.Config if err := base.UnpackConfig(&cfg); err != nil { @@ -45,101 +36,12 @@ func newModule(base mb.BaseModule) (mb.Module, error) { log := logp.NewLogger("cloudfoundry") hub := cfcommon.NewHub(&cfg, "metricbeat", log) - // early check that listener can be created - _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) - if err != nil { - return nil, err - - } - - return &Module{ - BaseModule: base, - log: log, - hub: hub, - }, nil -} - -func (m *Module) RunCounterReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(reporter, m.valueReporter, m.containerReporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(nil, m.valueReporter, m.containerReporter) - m.listenerLock.Unlock() -} - -func (m *Module) RunValueReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(m.counterReporter, reporter, m.containerReporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(m.counterReporter, nil, m.containerReporter) - m.listenerLock.Unlock() -} - -func (m *Module) RunContainerReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(m.counterReporter, m.valueReporter, reporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(m.counterReporter, m.valueReporter, nil) - m.listenerLock.Unlock() -} - -func (m *Module) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { - if m.listener != nil { - m.listener.Stop() - m.listener = nil - } - m.counterReporter = counterReporter - m.valueReporter = valueReporter - m.containerReporter = containerReporter - - start := false - callbacks := cfcommon.RlpListenerCallbacks{} - if m.counterReporter != nil { - start = true - callbacks.Counter = func(evt *cfcommon.EventCounter) { - m.counterReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if m.valueReporter != nil { - start = true - callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) { - m.valueReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if m.containerReporter != nil { - start = true - callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) { - m.containerReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if start { - l, err := m.hub.RlpListener(callbacks) - if err != nil { - m.log.Errorf("failed to create RlpListener: %v", err) - return - } - l.Start(context.Background()) - m.listener = l + switch cfg.Version { + //case cfcommon.ConsumerVersionV1: + // return newModuleV1(base, hub, log) + case cfcommon.ConsumerVersionV2: + return newModuleV2(base, hub, log) + default: + return nil, fmt.Errorf("not supported consumer version: %s", cfg.Version) } } diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container.go b/x-pack/metricbeat/module/cloudfoundry/container/container.go index 73019fb8808..4f8c6227103 100644 --- a/x-pack/metricbeat/module/cloudfoundry/container/container.go +++ b/x-pack/metricbeat/module/cloudfoundry/container/container.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } diff --git a/x-pack/metricbeat/module/cloudfoundry/counter/counter.go b/x-pack/metricbeat/module/cloudfoundry/counter/counter.go index 10022f87d04..53d3833d810 100644 --- a/x-pack/metricbeat/module/cloudfoundry/counter/counter.go +++ b/x-pack/metricbeat/module/cloudfoundry/counter/counter.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go new file mode 100644 index 00000000000..de397ce22eb --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -0,0 +1,5 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry diff --git a/x-pack/metricbeat/module/cloudfoundry/v2.go b/x-pack/metricbeat/module/cloudfoundry/v2.go new file mode 100644 index 00000000000..5cf7de6c103 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v2.go @@ -0,0 +1,128 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +type ModuleV2 struct { + mb.BaseModule + + log *logp.Logger + + hub *cfcommon.Hub + listener *cfcommon.RlpListener + listenerLock sync.Mutex + + counterReporter mb.PushReporterV2 + valueReporter mb.PushReporterV2 + containerReporter mb.PushReporterV2 +} + +func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (mb.Module, error) { + // early check that listener can be created + _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) + if err != nil { + return nil, err + + } + + return &ModuleV2{ + BaseModule: base, + log: log, + hub: hub, + }, nil +} + +func (m *ModuleV2) RunCounterReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(reporter, m.valueReporter, m.containerReporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(nil, m.valueReporter, m.containerReporter) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) RunValueReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(m.counterReporter, reporter, m.containerReporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(m.counterReporter, nil, m.containerReporter) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) RunContainerReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(m.counterReporter, m.valueReporter, reporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(m.counterReporter, m.valueReporter, nil) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { + if m.listener != nil { + m.listener.Stop() + m.listener = nil + } + m.counterReporter = counterReporter + m.valueReporter = valueReporter + m.containerReporter = containerReporter + + start := false + callbacks := cfcommon.RlpListenerCallbacks{} + if m.counterReporter != nil { + start = true + callbacks.Counter = func(evt *cfcommon.EventCounter) { + m.counterReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if m.valueReporter != nil { + start = true + callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) { + m.valueReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if m.containerReporter != nil { + start = true + callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) { + m.containerReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if start { + l, err := m.hub.RlpListener(callbacks) + if err != nil { + m.log.Errorf("failed to create RlpListener: %v", err) + return + } + l.Start(context.TODO()) + m.listener = l + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value.go b/x-pack/metricbeat/module/cloudfoundry/value/value.go index 7a30a2c67db..55cb6ca689a 100644 --- a/x-pack/metricbeat/module/cloudfoundry/value/value.go +++ b/x-pack/metricbeat/module/cloudfoundry/value/value.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } From 4a68a294a6da3ea1db28301cb795f9f282e4ad83 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Jun 2020 08:54:12 +0200 Subject: [PATCH 03/10] Small refactor in v2 --- x-pack/metricbeat/module/cloudfoundry/v1.go | 3 +++ x-pack/metricbeat/module/cloudfoundry/v2.go | 23 ++++++--------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go index de397ce22eb..f6d1d5ebe58 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v1.go +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -3,3 +3,6 @@ // you may not use this file except in compliance with the Elastic License. package cloudfoundry + +type ModuleV1 struct { +} diff --git a/x-pack/metricbeat/module/cloudfoundry/v2.go b/x-pack/metricbeat/module/cloudfoundry/v2.go index 5cf7de6c103..f5e117c0e6c 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v2.go +++ b/x-pack/metricbeat/module/cloudfoundry/v2.go @@ -43,42 +43,31 @@ func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (mb.Mo } func (m *ModuleV2) RunCounterReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() m.runReporters(reporter, m.valueReporter, m.containerReporter) - m.listenerLock.Unlock() + defer m.runReporters(nil, m.valueReporter, m.containerReporter) <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(nil, m.valueReporter, m.containerReporter) - m.listenerLock.Unlock() } func (m *ModuleV2) RunValueReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() m.runReporters(m.counterReporter, reporter, m.containerReporter) - m.listenerLock.Unlock() + defer m.runReporters(m.counterReporter, nil, m.containerReporter) <-reporter.Done() - m.listenerLock.Lock() - m.runReporters(m.counterReporter, nil, m.containerReporter) - m.listenerLock.Unlock() } func (m *ModuleV2) RunContainerReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() m.runReporters(m.counterReporter, m.valueReporter, reporter) - m.listenerLock.Unlock() + defer m.runReporters(m.counterReporter, m.valueReporter, nil) <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(m.counterReporter, m.valueReporter, nil) - m.listenerLock.Unlock() } func (m *ModuleV2) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { + m.listenerLock.Lock() + defer m.listenerLock.Unlock() + if m.listener != nil { m.listener.Stop() m.listener = nil From 6eeb6b4e4ca8575113bc210df6c289252b18466b Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Jun 2020 12:11:42 +0200 Subject: [PATCH 04/10] It works --- .../module/cloudfoundry/cloudfoundry.go | 4 +- x-pack/metricbeat/module/cloudfoundry/v1.go | 152 +++++++++++++++ .../metricbeat/module/cloudfoundry/v1_test.go | 181 ++++++++++++++++++ x-pack/metricbeat/module/cloudfoundry/v2.go | 5 +- 4 files changed, 337 insertions(+), 5 deletions(-) create mode 100644 x-pack/metricbeat/module/cloudfoundry/v1_test.go diff --git a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go index 90b26762d43..961827469dd 100644 --- a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go +++ b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go @@ -37,8 +37,8 @@ func newModule(base mb.BaseModule) (mb.Module, error) { hub := cfcommon.NewHub(&cfg, "metricbeat", log) switch cfg.Version { - //case cfcommon.ConsumerVersionV1: - // return newModuleV1(base, hub, log) + case cfcommon.ConsumerVersionV1: + return newModuleV1(base, hub, log) case cfcommon.ConsumerVersionV2: return newModuleV2(base, hub, log) default: diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go index f6d1d5ebe58..b6e2853e052 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v1.go +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -4,5 +4,157 @@ package cloudfoundry +import ( + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + type ModuleV1 struct { + mb.BaseModule + + log *logp.Logger + + running atomic.Bool + consumer *cfcommon.DopplerConsumer + + events chan cfcommon.Event + subscriptions chan subscription +} + +func newModuleV1(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (*ModuleV1, error) { + m := ModuleV1{ + BaseModule: base, + log: log, + running: atomic.MakeBool(false), + } + consumer, err := hub.DopplerConsumer(cfcommon.DopplerCallbacks{ + Metric: m.callback, + }) + if err != nil { + return nil, err + } + m.consumer = consumer + m.events = make(chan cfcommon.Event) + m.subscriptions = make(chan subscription) + + return &m, nil +} + +func (m *ModuleV1) RunCounterReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeCounter, reporter) + defer m.unsubscribe(cfcommon.EventTypeCounter, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) RunValueReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeValueMetric, reporter) + defer m.unsubscribe(cfcommon.EventTypeValueMetric, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) RunContainerReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeContainerMetric, reporter) + defer m.unsubscribe(cfcommon.EventTypeContainerMetric, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) subscribe(eventType cfcommon.EventType, reporter mb.PushReporterV2) { + go m.run() + m.subscriptions <- subscription{ + eventType: eventType, + reporter: reporter, + } +} + +func (m *ModuleV1) unsubscribe(eventType cfcommon.EventType, reporter mb.PushReporterV2) { + m.subscriptions <- subscription{ + eventType: eventType, + reporter: reporter, + unsubscribe: true, + } +} + +func (m *ModuleV1) callback(event cfcommon.Event) { + m.events <- event +} + +func (m *ModuleV1) run() { + if !m.running.CAS(false, true) { + return + } + + m.consumer.Run() + defer m.consumer.Stop() + defer func() { m.running.Store(false) }() + + dispatcher := newEventDispatcher(m.log) + + for { + select { + case e := <-m.events: + dispatcher.dispatch(e) + case s := <-m.subscriptions: + dispatcher.handleSubscription(s) + if dispatcher.empty() { + return + } + } + } +} + +type subscription struct { + eventType cfcommon.EventType + reporter mb.PushReporterV2 + + unsubscribe bool +} + +type eventDispatcher struct { + log *logp.Logger + reporters map[cfcommon.EventType]mb.PushReporterV2 +} + +func newEventDispatcher(log *logp.Logger) *eventDispatcher { + return &eventDispatcher{ + log: log, + reporters: make(map[cfcommon.EventType]mb.PushReporterV2), + } +} + +func (d *eventDispatcher) handleSubscription(s subscription) { + current, subscribed := d.reporters[s.eventType] + if s.unsubscribe { + if !subscribed || current != s.reporter { + // This can happen if same metricset is used twice + d.log.Warnf("unsubscribing not subscribed reporter for %s", s.eventType) + return + } + delete(d.reporters, s.eventType) + } else { + if subscribed { + if s.reporter != current { + // This can happen if same metricset is used twice + d.log.Warnf("subscribing multiple reporters for %s", s.eventType) + } + return + } + d.reporters[s.eventType] = s.reporter + } +} + +func (d *eventDispatcher) dispatch(e cfcommon.Event) { + reporter, found := d.reporters[e.EventType()] + if !found { + return + } + reporter.Event(mb.Event{ + Timestamp: e.Timestamp(), + RootFields: e.ToFields(), + }) +} + +func (d *eventDispatcher) empty() bool { + return len(d.reporters) == 0 } diff --git a/x-pack/metricbeat/module/cloudfoundry/v1_test.go b/x-pack/metricbeat/module/cloudfoundry/v1_test.go new file mode 100644 index 00000000000..b30952a9189 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v1_test.go @@ -0,0 +1,181 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +func TestDispatcher(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + log := logp.NewLogger("cloudfoundry") + + assertEventType := func(t *testing.T, expected string, e mb.Event) { + t.Helper() + cf := e.RootFields["cloudfoundry"].(common.MapStr) + assert.Equal(t, expected, cf["type"]) + } + + waitFor := func(t *testing.T, expected string, r pushReporter) { + t.Helper() + select { + case e := <-r.events: + assertEventType(t, expected, e) + default: + t.Errorf("expected %s event", expected) + } + } + + t.Run("subscribe to one type", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 1)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + }) + + t.Run("subscribe and unsubscribe", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 1)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + unsubscribe: true, + }) + + assert.True(t, d.empty()) + d.dispatch(&cfcommon.EventCounter{}) + + select { + case <-r.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + }) + + t.Run("subscribe to two types", func(t *testing.T) { + d := newEventDispatcher(log) + + counterReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &counterReporter, + }) + + valueReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeValueMetric, + reporter: &valueReporter, + }) + + d.dispatch(&cfcommon.EventCounter{}) + d.dispatch(&cfcommon.EventValueMetric{}) + + waitFor(t, "counter", counterReporter) + waitFor(t, "value", valueReporter) + }) + + t.Run("subscribe to two types, receive only from one", func(t *testing.T) { + d := newEventDispatcher(log) + + counterReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &counterReporter, + }) + + valueReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeValueMetric, + reporter: &valueReporter, + }) + + d.dispatch(&cfcommon.EventCounter{}) + d.dispatch(&cfcommon.EventCounter{}) + + select { + case <-valueReporter.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + + waitFor(t, "counter", counterReporter) + waitFor(t, "counter", counterReporter) + }) + + t.Run("subscribe twice to same type, ignore second", func(t *testing.T) { + d := newEventDispatcher(log) + first := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &first, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", first) + + second := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &second, + }) + + d.dispatch(&cfcommon.EventCounter{}) + select { + case <-second.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + waitFor(t, "counter", first) + }) + + t.Run("unsubscribe not subscribed reporters, first one continues subscribed", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &pushReporter{}, + unsubscribe: true, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + }) +} + +type pushReporter struct { + events chan mb.Event +} + +func (r *pushReporter) Done() <-chan struct{} { return nil } +func (r *pushReporter) Error(err error) bool { return true } +func (r *pushReporter) Event(e mb.Event) bool { + r.events <- e + return true +} diff --git a/x-pack/metricbeat/module/cloudfoundry/v2.go b/x-pack/metricbeat/module/cloudfoundry/v2.go index f5e117c0e6c..92abd13a8f3 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v2.go +++ b/x-pack/metricbeat/module/cloudfoundry/v2.go @@ -27,12 +27,11 @@ type ModuleV2 struct { containerReporter mb.PushReporterV2 } -func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (mb.Module, error) { - // early check that listener can be created +func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (*ModuleV2, error) { + // Early check that listener can be created _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) if err != nil { return nil, err - } return &ModuleV2{ From ed8b6539db058a5e57dc425db847ad945dfdc263 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Jun 2020 12:46:15 +0200 Subject: [PATCH 05/10] Ignore nil error on firehose --- x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index 8d068a4fd84..10ea50dd928 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -122,9 +122,11 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF } cb(event) case err := <-errChan: - // This error is an error on the connection, not a cloud foundry - // error envelope. Firehose should be able to reconnect, so just log it. - c.log.Infof("Error received on firehose: %v", err) + if err != nil { + // This error is an error on the connection, not a cloud foundry + // error envelope. Firehose should be able to reconnect, so just log it. + c.log.Infof("Error received on firehose: %v", err) + } case <-c.stop: return } From ce2448cd3b9c9d4ac5297dcb30968596e5e31588 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Jun 2020 12:51:02 +0200 Subject: [PATCH 06/10] Revert changes in V2 --- x-pack/metricbeat/module/cloudfoundry/v2.go | 28 +++++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/x-pack/metricbeat/module/cloudfoundry/v2.go b/x-pack/metricbeat/module/cloudfoundry/v2.go index 92abd13a8f3..5cf7de6c103 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v2.go +++ b/x-pack/metricbeat/module/cloudfoundry/v2.go @@ -27,11 +27,12 @@ type ModuleV2 struct { containerReporter mb.PushReporterV2 } -func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (*ModuleV2, error) { - // Early check that listener can be created +func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (mb.Module, error) { + // early check that listener can be created _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) if err != nil { return nil, err + } return &ModuleV2{ @@ -42,31 +43,42 @@ func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (*Modu } func (m *ModuleV2) RunCounterReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() m.runReporters(reporter, m.valueReporter, m.containerReporter) - defer m.runReporters(nil, m.valueReporter, m.containerReporter) + m.listenerLock.Unlock() <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(nil, m.valueReporter, m.containerReporter) + m.listenerLock.Unlock() } func (m *ModuleV2) RunValueReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() m.runReporters(m.counterReporter, reporter, m.containerReporter) - defer m.runReporters(m.counterReporter, nil, m.containerReporter) + m.listenerLock.Unlock() <-reporter.Done() + m.listenerLock.Lock() + m.runReporters(m.counterReporter, nil, m.containerReporter) + m.listenerLock.Unlock() } func (m *ModuleV2) RunContainerReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() m.runReporters(m.counterReporter, m.valueReporter, reporter) - defer m.runReporters(m.counterReporter, m.valueReporter, nil) + m.listenerLock.Unlock() <-reporter.Done() -} -func (m *ModuleV2) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { m.listenerLock.Lock() - defer m.listenerLock.Unlock() + m.runReporters(m.counterReporter, m.valueReporter, nil) + m.listenerLock.Unlock() +} +func (m *ModuleV2) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { if m.listener != nil { m.listener.Stop() m.listener = nil From 4a8465b5b07f2f589efa11a7d9f8dba0a55f1426 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Jun 2020 13:34:01 +0200 Subject: [PATCH 07/10] Add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9a3e872b16d..258a7bd8bea 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -500,6 +500,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add memory metrics into compute googlecloud. {pull}18802[18802] - Add new fields to HAProxy module. {issue}18523[18523] - Add Tomcat overview dashboard {pull}14026[14026] +- Add support for v1 consumer API in Cloud Foundry module, use it by default. {pull}19268[19268] *Packetbeat* From f2f516998573eda13c81802a08fae21dfbbba260 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Jun 2020 13:42:44 +0200 Subject: [PATCH 08/10] Add option to docs --- metricbeat/docs/modules/cloudfoundry.asciidoc | 9 ++++++++- x-pack/metricbeat/metricbeat.reference.yml | 1 + .../module/cloudfoundry/_meta/config.reference.yml | 1 + .../metricbeat/module/cloudfoundry/_meta/docs.asciidoc | 8 +++++++- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/metricbeat/docs/modules/cloudfoundry.asciidoc b/metricbeat/docs/modules/cloudfoundry.asciidoc index 0baeb3f4180..3a8ba132280 100644 --- a/metricbeat/docs/modules/cloudfoundry.asciidoc +++ b/metricbeat/docs/modules/cloudfoundry.asciidoc @@ -85,7 +85,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr [float] === `rlp_address` -The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)". +The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". [float] === `client_id` @@ -103,6 +103,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "". Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events from the RLP Gateway. Default: "(generated UUID)". +[float] +==== `version` + +Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control. +Use `v2` to collect events from the RLP Gateway. Default: "`v1`". + [float] === `ssl` @@ -130,6 +136,7 @@ metricbeat.modules: rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 ---- [float] diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index e4bf6b13d63..5b5013a4bc3 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -372,6 +372,7 @@ metricbeat.modules: rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 #----------------------------- CockroachDB Module ----------------------------- - module: cockroachdb diff --git a/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml b/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml index c157d5deeff..be15db23b65 100644 --- a/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml +++ b/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml @@ -10,3 +10,4 @@ rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 diff --git a/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc b/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc index 762d3e4f34f..b36a41bf891 100644 --- a/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc @@ -75,7 +75,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr [float] === `rlp_address` -The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)". +The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". [float] === `client_id` @@ -93,6 +93,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "". Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events from the RLP Gateway. Default: "(generated UUID)". +[float] +==== `version` + +Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control. +Use `v2` to collect events from the RLP Gateway. Default: "`v1`". + [float] === `ssl` From c444587a695c6fe2c956c231885bc03e32a2a5e7 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 23 Jun 2020 20:45:37 +0200 Subject: [PATCH 09/10] Add some more comments and improve log message --- x-pack/metricbeat/module/cloudfoundry/v1.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go index b6e2853e052..e3e65230ac5 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v1.go +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -92,6 +92,8 @@ func (m *ModuleV1) run() { dispatcher := newEventDispatcher(m.log) for { + // Handle subscriptions and events dispatching on the same + // goroutine so locking is not needed. select { case e := <-m.events: dispatcher.dispatch(e) @@ -111,6 +113,8 @@ type subscription struct { unsubscribe bool } +// eventDispatcher keeps track on the reporters that are subscribed to each event type +// and dispatches events to them when received. type eventDispatcher struct { log *logp.Logger reporters map[cfcommon.EventType]mb.PushReporterV2 @@ -128,7 +132,7 @@ func (d *eventDispatcher) handleSubscription(s subscription) { if s.unsubscribe { if !subscribed || current != s.reporter { // This can happen if same metricset is used twice - d.log.Warnf("unsubscribing not subscribed reporter for %s", s.eventType) + d.log.Warnf("Ignoring unsubscription of not subscribed reporter for %s", s.eventType) return } delete(d.reporters, s.eventType) @@ -136,7 +140,7 @@ func (d *eventDispatcher) handleSubscription(s subscription) { if subscribed { if s.reporter != current { // This can happen if same metricset is used twice - d.log.Warnf("subscribing multiple reporters for %s", s.eventType) + d.log.Warnf("Ignoring subscription of multiple reporters for %s", s.eventType) } return } From 0dde02f0d32b46c007ea530575b9793e1b6c1082 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 23 Jun 2020 20:47:19 +0200 Subject: [PATCH 10/10] Fix defer order --- x-pack/metricbeat/module/cloudfoundry/v1.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go index e3e65230ac5..7d9daf24673 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v1.go +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -84,10 +84,10 @@ func (m *ModuleV1) run() { if !m.running.CAS(false, true) { return } + defer func() { m.running.Store(false) }() m.consumer.Run() defer m.consumer.Stop() - defer func() { m.running.Store(false) }() dispatcher := newEventDispatcher(m.log)