diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index eb618bb63..a2fa5246e 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -19,7 +19,6 @@ package main import ( "context" - "crypto/tls" "encoding/json" "fmt" "net/http" @@ -35,7 +34,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/prometheus/client_golang/prometheus" + "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -182,27 +181,7 @@ func run() { // Setup (threads) exit manager utils.SetupElegantExit() - - // set up private prometheus registry - if cfg.MetricsSettings.SuppressGoMetrics { - reg := prometheus.NewRegistry() - prometheus.DefaultRegisterer = reg - prometheus.DefaultGatherer = reg - } - - // create prometheus server for operational metrics - // if value of address is empty, then by default it will take 0.0.0.0 - addr := fmt.Sprintf("%s:%v", cfg.MetricsSettings.Address, cfg.MetricsSettings.Port) - log.Infof("startServer: addr = %s", addr) - promServer := &http.Server{ - Addr: addr, - // TLS clients must use TLS 1.2 or higher - TLSConfig: &tls.Config{ - MinVersion: tls.VersionTLS12, - }, - } - tlsConfig := cfg.MetricsSettings.TLS - go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry)) + promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings) // Create new flows pipeline mainPipeline, err = pipeline.NewPipeline(&cfg) @@ -225,7 +204,9 @@ func run() { // Starts the flows pipeline mainPipeline.Run() - _ = promServer.Shutdown(context.Background()) + if promServer != nil { + _ = promServer.Shutdown(context.Background()) + } // Give all threads a chance to exit and then exit the process time.Sleep(time.Second) diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 8582a04e3..c31635cda 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -63,15 +63,13 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { if cg.config.Write.Loki != nil { forkedNode.WriteLoki("write_loki", *cg.config.Write.Loki) } - return &config.ConfigFileStruct{ - LogLevel: "error", - Pipeline: pipeline.GetStages(), - Parameters: pipeline.GetStageParams(), + return pipeline.IntoConfigFileStruct(&config.ConfigFileStruct{ + LogLevel: "error", MetricsSettings: config.MetricsSettings{ PromConnectionInfo: api.PromConnectionInfo{Port: 9102}, Prefix: "flp_op_", }, - } + }) } func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam { diff --git a/pkg/config/config.go b/pkg/config/config.go index bbd18b850..d2c0242cc 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -56,9 +56,10 @@ type Profile struct { // will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides. type MetricsSettings struct { api.PromConnectionInfo - Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"` - NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"` - SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"` + DisableGlobalServer bool `yaml:"disableGlobalServer,omitempty" json:"disableGlobalServer,omitempty" doc:"disabling the global metrics server makes operational metrics unavailable. If prometheus-encoding stages are defined, they need to contain their own metrics server parameters."` + Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"` + NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"` + SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"` } // PerfSettings allows setting some internal configuration parameters diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index f52dbc95e..796da0631 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -48,6 +48,8 @@ type PipelineBuilderStage struct { pipeline *pipeline } +const PresetIngesterStage = "preset-ingester" + // NewPipeline creates a new pipeline from an existing ingest func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) { if ingest.Collector != nil { @@ -89,6 +91,15 @@ func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage return PipelineBuilderStage{pipeline: &p, lastStage: name} } +// NewPresetIngesterPipeline creates a new partial pipeline without ingest stage +func NewPresetIngesterPipeline() PipelineBuilderStage { + p := pipeline{ + stages: []Stage{}, + config: []StageParam{}, + } + return PipelineBuilderStage{pipeline: &p, lastStage: PresetIngesterStage} +} + func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuilderStage { b.pipeline.stages = append(b.pipeline.stages, Stage{Name: name, Follows: b.lastStage}) b.pipeline.config = append(b.pipeline.config, param) @@ -164,3 +175,15 @@ func (b *PipelineBuilderStage) GetStages() []Stage { func (b *PipelineBuilderStage) GetStageParams() []StageParam { return b.pipeline.config } + +// IntoConfigFileStruct injects the current pipeline and params in the provided ConfigFileStruct object. +func (b *PipelineBuilderStage) IntoConfigFileStruct(cfs *ConfigFileStruct) *ConfigFileStruct { + cfs.Pipeline = b.GetStages() + cfs.Parameters = b.GetStageParams() + return cfs +} + +// ToConfigFileStruct returns the current pipeline and params as a new ConfigFileStruct object. +func (b *PipelineBuilderStage) ToConfigFileStruct() *ConfigFileStruct { + return b.IntoConfigFileStruct(&ConfigFileStruct{}) +} diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index f4812140b..a33886633 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -18,9 +18,7 @@ package encode import ( - "crypto/tls" "fmt" - "net/http" "strings" "time" @@ -28,6 +26,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/prometheus/client_golang/prometheus" @@ -279,17 +278,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En if cfg.PromConnectionInfo != nil { registry := prometheus.NewRegistry() registerer = registry - addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port) - log.Infof("startServer: addr = %s", addr) - promServer := &http.Server{ - Addr: addr, - // TLS clients must use TLS 1.2 or higher - TLSConfig: &tls.Config{ - MinVersion: tls.VersionTLS12, - }, - } - tlsConfig := cfg.PromConnectionInfo.TLS - go putils.StartPromServer(tlsConfig, promServer, true, registry) + promserver.StartServerAsync(cfg.PromConnectionInfo, nil) } else { registerer = prometheus.DefaultRegisterer } diff --git a/pkg/pipeline/ingest/ingest_inprocess.go b/pkg/pipeline/ingest/ingest_inprocess.go new file mode 100644 index 000000000..266b26247 --- /dev/null +++ b/pkg/pipeline/ingest/ingest_inprocess.go @@ -0,0 +1,29 @@ +package ingest + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" +) + +// InProcess ingester is meant to be imported and used from another program +// via pipeline.StartFLPInProcess +type InProcess struct { + in chan config.GenericMap +} + +func NewInProcess(in chan config.GenericMap) *InProcess { + return &InProcess{in: in} +} + +func (d *InProcess) Ingest(out chan<- config.GenericMap) { + go func() { + <-utils.ExitChannel() + d.Close() + }() + for rec := range d.in { + out <- rec + } +} + +func (d *InProcess) Close() { +} diff --git a/pkg/pipeline/inprocess.go b/pkg/pipeline/inprocess.go new file mode 100644 index 000000000..9b578610b --- /dev/null +++ b/pkg/pipeline/inprocess.go @@ -0,0 +1,32 @@ +package pipeline + +import ( + "context" + "fmt" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" + "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" +) + +// StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code +func StartFLPInProcess(cfg *config.ConfigFileStruct, in chan config.GenericMap) error { + promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings) + + // Create new flows pipeline + ingester := ingest.NewInProcess(in) + flp, err := newPipelineFromIngester(cfg, ingester) + if err != nil { + return fmt.Errorf("failed to initialize pipeline %w", err) + } + + // Starts the flows pipeline; blocking call + go func() { + flp.Run() + if promServer != nil { + _ = promServer.Shutdown(context.Background()) + } + }() + + return nil +} diff --git a/pkg/pipeline/inprocess_test.go b/pkg/pipeline/inprocess_test.go new file mode 100644 index 000000000..5bed5f3df --- /dev/null +++ b/pkg/pipeline/inprocess_test.go @@ -0,0 +1,55 @@ +package pipeline + +import ( + "bufio" + "encoding/json" + "os" + "testing" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestInProcessFLP(t *testing.T) { + pipeline := config.NewPresetIngesterPipeline() + pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"}) + in := make(chan config.GenericMap, 100) + defer close(in) + err := StartFLPInProcess(pipeline.ToConfigFileStruct(), in) + require.NoError(t, err) + + capturedOut, w, _ := os.Pipe() + old := os.Stdout + os.Stdout = w + defer func() { + os.Stdout = old + }() + + // yield thread to allow pipe services correctly start + time.Sleep(10 * time.Millisecond) + + in <- config.GenericMap{ + "SrcAddr": "1.2.3.4", + "DstAddr": "5.6.7.8", + "Dscp": float64(1), + "DstMac": "11:22:33:44:55:66", + "SrcMac": "01:02:03:04:05:06", + } + + scanner := bufio.NewScanner(capturedOut) + require.True(t, scanner.Scan()) + capturedRecord := map[string]interface{}{} + bytes := scanner.Bytes() + require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes)) + + assert.EqualValues(t, map[string]interface{}{ + "SrcAddr": "1.2.3.4", + "DstAddr": "5.6.7.8", + "Dscp": float64(1), + "DstMac": "11:22:33:44:55:66", + "SrcMac": "01:02:03:04:05:06", + }, capturedRecord) +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index e9e923116..12cadfce5 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -22,6 +22,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" "github.com/netobserv/gopipes/pkg/node" log "github.com/sirupsen/logrus" ) @@ -48,18 +49,24 @@ type Pipeline struct { // NewPipeline defines the pipeline elements func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) { - log.Debugf("entering NewPipeline") + return newPipelineFromIngester(cfg, nil) +} + +// newPipelineFromIngester defines the pipeline elements from a preset ingester (e.g. for in-process receiver) +func newPipelineFromIngester(cfg *config.ConfigFileStruct, ing ingest.Ingester) (*Pipeline, error) { + log.Debugf("entering newPipelineFromIngester") - stages := cfg.Pipeline - log.Debugf("stages = %v ", stages) - configParams := cfg.Parameters - log.Debugf("configParams = %v ", configParams) + log.Debugf("stages = %v ", cfg.Pipeline) + log.Debugf("configParams = %v ", cfg.Parameters) - build := newBuilder(cfg) - if err := build.readStages(); err != nil { + builder := newBuilder(cfg) + if ing != nil { + builder.presetIngester(ing) + } + if err := builder.readStages(); err != nil { return nil, err } - return build.build() + return builder.build() } func (p *Pipeline) Run() { diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index ac752e645..a3141a6f2 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -98,6 +98,17 @@ func newBuilder(cfg *config.ConfigFileStruct) *builder { } } +// use a preset ingester +func (b *builder) presetIngester(ing ingest.Ingester) { + name := config.PresetIngesterStage + log.Debugf("stage = %v", name) + b.appendEntry(pipelineEntry{ + stageName: name, + stageType: StageIngest, + Ingester: ing, + }) +} + // read the configuration stages definition and instantiate the corresponding native Go objects func (b *builder) readStages() error { for _, param := range b.configParams { @@ -124,14 +135,18 @@ func (b *builder) readStages() error { if err != nil { return err } - b.pipelineEntryMap[param.Name] = &pEntry - b.pipelineStages = append(b.pipelineStages, &pEntry) - log.Debugf("pipeline = %v", b.pipelineStages) + b.appendEntry(pEntry) } log.Debugf("pipeline = %v", b.pipelineStages) return nil } +func (b *builder) appendEntry(pEntry pipelineEntry) { + b.pipelineEntryMap[pEntry.stageName] = &pEntry + b.pipelineStages = append(b.pipelineStages, &pEntry) + log.Debugf("pipeline = %v", b.pipelineStages) +} + // reads the configured Go stages and connects between them // readStages must be invoked before this func (b *builder) build() (*Pipeline, error) { diff --git a/pkg/pipeline/utils/prom_server.go b/pkg/pipeline/utils/prom_server.go deleted file mode 100644 index 17d0e8c7d..000000000 --- a/pkg/pipeline/utils/prom_server.go +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2023 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package utils - -import ( - "net/http" - "os" - - "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" -) - -// StartPromServer listens for prometheus resource usage requests -func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool, reg *prometheus.Registry) { - logrus.Debugf("entering StartPromServer") - - // The Handler function provides a default handler to expose metrics - // via an HTTP server. "/metrics" is the usual endpoint for that. - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) - server.Handler = mux - - var err error - if tlsConfig != nil { - err = server.ListenAndServeTLS(tlsConfig.CertPath, tlsConfig.KeyPath) - } else { - err = server.ListenAndServe() - } - if err != nil && err != http.ErrServerClosed { - logrus.Errorf("error in http.ListenAndServe: %v", err) - if panicOnError { - os.Exit(1) - } - } -} diff --git a/pkg/prometheus/prom_server.go b/pkg/prometheus/prom_server.go new file mode 100644 index 000000000..d40000bc0 --- /dev/null +++ b/pkg/prometheus/prom_server.go @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package prometheus + +import ( + "crypto/tls" + "fmt" + "net/http" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +var ( + plog = logrus.WithField("component", "prometheus") + maybePanic = plog.Fatalf +) + +// InitializePrometheus starts the global Prometheus server, used for operational metrics and prom-encode stages if they don't override the server settings +func InitializePrometheus(settings *config.MetricsSettings) *http.Server { + if settings.NoPanic { + maybePanic = plog.Errorf + } + if settings.DisableGlobalServer { + plog.Info("Disabled global Prometheus server - no operational metrics will be available") + return nil + } + if settings.SuppressGoMetrics { + // set up private prometheus registry + r := prom.NewRegistry() + prom.DefaultRegisterer = r + prom.DefaultGatherer = r + } + return StartServerAsync(&settings.PromConnectionInfo, nil) +} + +// StartServerAsync listens for prometheus resource usage requests +func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *http.Server { + // create prometheus server for operational metrics + // if value of address is empty, then by default it will take 0.0.0.0 + port := conn.Port + if port == 0 { + port = 9090 + } + addr := fmt.Sprintf("%s:%v", conn.Address, port) + plog.Infof("StartServerAsync: addr = %s", addr) + + httpServer := http.Server{ + Addr: addr, + // TLS clients must use TLS 1.2 or higher + TLSConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + } + // The Handler function provides a default handler to expose metrics + // via an HTTP server. "/metrics" is the usual endpoint for that. + mux := http.NewServeMux() + if registry == nil { + mux.Handle("/metrics", promhttp.Handler()) + } else { + mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + } + httpServer.Handler = mux + + go func() { + var err error + if conn.TLS != nil { + err = httpServer.ListenAndServeTLS(conn.TLS.CertPath, conn.TLS.KeyPath) + } else { + err = httpServer.ListenAndServe() + } + if err != nil && err != http.ErrServerClosed { + maybePanic("error in http.ListenAndServe: %v", err) + } + }() + + return &httpServer +}