Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to start FLP directly from the flow logs producer #538

Merged
merged 5 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 5 additions & 24 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -35,7 +34,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -182,27 +181,7 @@ func run() {

// Setup (threads) exit manager
utils.SetupElegantExit()

// set up private prometheus registry
if cfg.MetricsSettings.SuppressGoMetrics {
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
}

// create prometheus server for operational metrics
// if value of address is empty, then by default it will take 0.0.0.0
addr := fmt.Sprintf("%s:%v", cfg.MetricsSettings.Address, cfg.MetricsSettings.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.MetricsSettings.TLS
go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry))
promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings)

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
Expand All @@ -225,7 +204,9 @@ func run() {
// Starts the flows pipeline
mainPipeline.Run()

_ = promServer.Shutdown(context.Background())
if promServer != nil {
_ = promServer.Shutdown(context.Background())
}

// Give all threads a chance to exit and then exit the process
time.Sleep(time.Second)
Expand Down
8 changes: 3 additions & 5 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,13 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct {
if cg.config.Write.Loki != nil {
forkedNode.WriteLoki("write_loki", *cg.config.Write.Loki)
}
return &config.ConfigFileStruct{
LogLevel: "error",
Pipeline: pipeline.GetStages(),
Parameters: pipeline.GetStageParams(),
return pipeline.IntoConfigFileStruct(&config.ConfigFileStruct{
LogLevel: "error",
MetricsSettings: config.MetricsSettings{
PromConnectionInfo: api.PromConnectionInfo{Port: 9102},
Prefix: "flp_op_",
},
}
})
}

func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam {
Expand Down
7 changes: 4 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ type Profile struct {
// will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides.
type MetricsSettings struct {
api.PromConnectionInfo
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"`
DisableGlobalServer bool `yaml:"disableGlobalServer,omitempty" json:"disableGlobalServer,omitempty" doc:"disabling the global metrics server makes operational metrics unavailable. If prometheus-encoding stages are defined, they need to contain their own metrics server parameters."`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"`
}

// PerfSettings allows setting some internal configuration parameters
Expand Down
23 changes: 23 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type PipelineBuilderStage struct {
pipeline *pipeline
}

const PresetIngesterStage = "preset-ingester"

// NewPipeline creates a new pipeline from an existing ingest
func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) {
if ingest.Collector != nil {
Expand Down Expand Up @@ -89,6 +91,15 @@ func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage
return PipelineBuilderStage{pipeline: &p, lastStage: name}
}

// NewPresetIngesterPipeline creates a new partial pipeline without ingest stage
func NewPresetIngesterPipeline() PipelineBuilderStage {
p := pipeline{
stages: []Stage{},
config: []StageParam{},
}
return PipelineBuilderStage{pipeline: &p, lastStage: PresetIngesterStage}
}

func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuilderStage {
b.pipeline.stages = append(b.pipeline.stages, Stage{Name: name, Follows: b.lastStage})
b.pipeline.config = append(b.pipeline.config, param)
Expand Down Expand Up @@ -164,3 +175,15 @@ func (b *PipelineBuilderStage) GetStages() []Stage {
func (b *PipelineBuilderStage) GetStageParams() []StageParam {
return b.pipeline.config
}

// IntoConfigFileStruct injects the current pipeline and params in the provided ConfigFileStruct object.
func (b *PipelineBuilderStage) IntoConfigFileStruct(cfs *ConfigFileStruct) *ConfigFileStruct {
cfs.Pipeline = b.GetStages()
cfs.Parameters = b.GetStageParams()
return cfs
}

// ToConfigFileStruct returns the current pipeline and params as a new ConfigFileStruct object.
func (b *PipelineBuilderStage) ToConfigFileStruct() *ConfigFileStruct {
return b.IntoConfigFileStruct(&ConfigFileStruct{})
}
15 changes: 2 additions & 13 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
package encode

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -279,17 +278,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
if cfg.PromConnectionInfo != nil {
registry := prometheus.NewRegistry()
registerer = registry
addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.PromConnectionInfo.TLS
go putils.StartPromServer(tlsConfig, promServer, true, registry)
promserver.StartServerAsync(cfg.PromConnectionInfo, nil)
} else {
registerer = prometheus.DefaultRegisterer
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/pipeline/ingest/ingest_inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ingest

import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
)

// InProcess ingester is meant to be imported and used from another program
// via pipeline.StartFLPInProcess
type InProcess struct {
in chan config.GenericMap
}

func NewInProcess(in chan config.GenericMap) *InProcess {
return &InProcess{in: in}
}

func (d *InProcess) Ingest(out chan<- config.GenericMap) {
go func() {
<-utils.ExitChannel()
d.Close()
}()
for rec := range d.in {
out <- rec
}
}

func (d *InProcess) Close() {
}
32 changes: 32 additions & 0 deletions pkg/pipeline/inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pipeline

import (
"context"
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
)

// StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code
func StartFLPInProcess(cfg *config.ConfigFileStruct, in chan config.GenericMap) error {
promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings)

// Create new flows pipeline
ingester := ingest.NewInProcess(in)
flp, err := newPipelineFromIngester(cfg, ingester)
if err != nil {
return fmt.Errorf("failed to initialize pipeline %w", err)
}

// Starts the flows pipeline; blocking call
go func() {
flp.Run()
if promServer != nil {
_ = promServer.Shutdown(context.Background())
}
}()

return nil
}
55 changes: 55 additions & 0 deletions pkg/pipeline/inprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pipeline

import (
"bufio"
"encoding/json"
"os"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestInProcessFLP(t *testing.T) {
pipeline := config.NewPresetIngesterPipeline()
pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"})
in := make(chan config.GenericMap, 100)
defer close(in)
err := StartFLPInProcess(pipeline.ToConfigFileStruct(), in)
require.NoError(t, err)

capturedOut, w, _ := os.Pipe()
old := os.Stdout
os.Stdout = w
defer func() {
os.Stdout = old
}()

// yield thread to allow pipe services correctly start
time.Sleep(10 * time.Millisecond)

in <- config.GenericMap{
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": float64(1),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
}

scanner := bufio.NewScanner(capturedOut)
require.True(t, scanner.Scan())
capturedRecord := map[string]interface{}{}
bytes := scanner.Bytes()
require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes))

assert.EqualValues(t, map[string]interface{}{
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": float64(1),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
}, capturedRecord)
}
23 changes: 15 additions & 8 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
"github.com/netobserv/gopipes/pkg/node"
log "github.com/sirupsen/logrus"
)
Expand All @@ -48,18 +49,24 @@ type Pipeline struct {

// NewPipeline defines the pipeline elements
func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) {
log.Debugf("entering NewPipeline")
return newPipelineFromIngester(cfg, nil)
}

// newPipelineFromIngester defines the pipeline elements from a preset ingester (e.g. for in-process receiver)
func newPipelineFromIngester(cfg *config.ConfigFileStruct, ing ingest.Ingester) (*Pipeline, error) {
log.Debugf("entering newPipelineFromIngester")

stages := cfg.Pipeline
log.Debugf("stages = %v ", stages)
configParams := cfg.Parameters
log.Debugf("configParams = %v ", configParams)
log.Debugf("stages = %v ", cfg.Pipeline)
log.Debugf("configParams = %v ", cfg.Parameters)

build := newBuilder(cfg)
if err := build.readStages(); err != nil {
builder := newBuilder(cfg)
if ing != nil {
builder.presetIngester(ing)
}
if err := builder.readStages(); err != nil {
return nil, err
}
return build.build()
return builder.build()
}

func (p *Pipeline) Run() {
Expand Down
21 changes: 18 additions & 3 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ func newBuilder(cfg *config.ConfigFileStruct) *builder {
}
}

// use a preset ingester
func (b *builder) presetIngester(ing ingest.Ingester) {
name := config.PresetIngesterStage
log.Debugf("stage = %v", name)
b.appendEntry(pipelineEntry{
stageName: name,
stageType: StageIngest,
Ingester: ing,
})
}

// read the configuration stages definition and instantiate the corresponding native Go objects
func (b *builder) readStages() error {
for _, param := range b.configParams {
Expand All @@ -124,14 +135,18 @@ func (b *builder) readStages() error {
if err != nil {
return err
}
b.pipelineEntryMap[param.Name] = &pEntry
b.pipelineStages = append(b.pipelineStages, &pEntry)
log.Debugf("pipeline = %v", b.pipelineStages)
b.appendEntry(pEntry)
}
log.Debugf("pipeline = %v", b.pipelineStages)
return nil
}

func (b *builder) appendEntry(pEntry pipelineEntry) {
b.pipelineEntryMap[pEntry.stageName] = &pEntry
b.pipelineStages = append(b.pipelineStages, &pEntry)
log.Debugf("pipeline = %v", b.pipelineStages)
}

// reads the configured Go stages and connects between them
// readStages must be invoked before this
func (b *builder) build() (*Pipeline, error) {
Expand Down
Loading