From bfbd89bd6d0d366c53d3c393722d052544ef7daf Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Fri, 29 Jul 2022 11:07:46 +0200 Subject: [PATCH] Merge confgen & main models (#254) (#267) * Confgen: init from options (avoid many globals) * Use main model in confgen, use pipeline builder Fixes #254 - Removed duplicated confgen model - Use PipelineBuilder to simplify pipeline generation - Make confgen easier to consume as a lib (e.g. for NOO) - Do not make it necessary to call "Run": parsing definition files should be sufficient - Do not make it necessary to work with files written on disk: work with []byte instead - SkipWithTags should not result in an error when a file is skipped - Add confgen tests * Avoid using globals A side-effect of removing globals in write_loki is that it changes how the config is read, and set with defaults. Instead of unmarshaling a second time to automatically get defaults, we now call an explicit function that sets the default. Also, now removing loki URL default, it now has to be set explicitely * Update ConnTrack builder * More defer cleanup in tests, and use ioutils / temp dir/files Also fixed jsonnet dir actually used as filename prefix rather than directory * User ConfigFileStruct in confgen * Update pkg/config/config.go Co-authored-by: Ronen Schaffer * Use config.ConfigFileStruct in tests Co-authored-by: Ronen Schaffer --- cmd/confgenerator/main.go | 28 +- cmd/flowlogs-pipeline/main.go | 19 +- pkg/api/write_loki.go | 36 +- pkg/confgen/confgen.go | 79 ++--- pkg/confgen/confgen_test.go | 335 ++++++++++++++++-- pkg/confgen/config.go | 33 +- pkg/confgen/config_test.go | 25 +- pkg/confgen/dedup.go | 2 +- pkg/confgen/doc.go | 2 +- pkg/confgen/flowlogs2metrics_config.go | 183 +++------- pkg/confgen/grafana_jsonnet.go | 10 +- pkg/config/config.go | 34 +- pkg/config/pipeline_builder.go | 40 ++- pkg/config/pipeline_builder_test.go | 8 +- pkg/config/stage_params.go | 70 ++++ pkg/operational/health/health.go | 4 +- pkg/operational/health/health_test.go | 6 +- pkg/pipeline/aggregate_prom_test.go | 6 +- pkg/pipeline/conntrack_integ_test.go | 4 +- pkg/pipeline/encode/encode_kafka_test.go | 4 +- pkg/pipeline/encode/encode_prom_test.go | 4 +- .../extract/aggregate/aggregates_test.go | 4 +- .../extract/extract_aggregate_test.go | 4 +- pkg/pipeline/ingest/ingest_kafka_test.go | 4 +- pkg/pipeline/pipeline.go | 6 +- pkg/pipeline/pipeline_builder_test.go | 8 +- pkg/pipeline/pipeline_test.go | 18 +- .../transform/transform_filter_test.go | 4 +- .../transform/transform_generic_test.go | 4 +- .../transform/transform_network_test.go | 4 +- pkg/pipeline/transform_multiple_test.go | 4 +- pkg/pipeline/transform_topk_test.go | 4 +- pkg/pipeline/utils/params_parse.go | 40 --- pkg/pipeline/write/write_loki.go | 28 +- pkg/pipeline/write/write_loki_test.go | 36 +- pkg/test/utils.go | 36 +- pkg/test/utils_test.go | 3 +- 37 files changed, 660 insertions(+), 479 deletions(-) create mode 100644 pkg/config/stage_params.go diff --git a/cmd/confgenerator/main.go b/cmd/confgenerator/main.go index 110eb1916..ca2f2c87c 100644 --- a/cmd/confgenerator/main.go +++ b/cmd/confgenerator/main.go @@ -39,6 +39,7 @@ var ( logLevel string envPrefix = "FLP_CONFGEN" defaultLogFileName = ".confgen" + opts confgen.Options ) // rootCmd represents the root command @@ -92,8 +93,8 @@ func initLogger() { log.SetFormatter(&log.TextFormatter{DisableColors: false, FullTimestamp: true}) } -func dumpConfig() { - configAsJSON, _ := json.MarshalIndent(confgen.Opt, "", "\t") +func dumpConfig(opts *confgen.Options) { + configAsJSON, _ := json.MarshalIndent(opts, "", "\t") log.Infof("configuration:\n%s\n", configAsJSON) } @@ -128,12 +129,12 @@ func initFlags() { rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName)) rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error") - rootCmd.PersistentFlags().StringVar(&confgen.Opt.SrcFolder, "srcFolder", "network_definitions", "source folder") - rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestConfFile, "destConfFile", "/tmp/flowlogs-pipeline.conf.yaml", "destination configuration file") - rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestDocFile, "destDocFile", "/tmp/metrics.md", "destination documentation file (.md)") - rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestGrafanaJsonnetFolder, "destGrafanaJsonnetFolder", "/tmp/jsonnet", "destination grafana jsonnet folder") - rootCmd.PersistentFlags().StringSliceVar(&confgen.Opt.SkipWithTags, "skipWithTags", nil, "Skip definitions with Tags") - rootCmd.PersistentFlags().StringSliceVar(&confgen.Opt.GenerateStages, "generateStages", nil, "Produce only specified stages (ingest, transform_generic, transform_network, extract_aggregate, encode_prom, write_loki") + rootCmd.PersistentFlags().StringVar(&opts.SrcFolder, "srcFolder", "network_definitions", "source folder") + rootCmd.PersistentFlags().StringVar(&opts.DestConfFile, "destConfFile", "/tmp/flowlogs-pipeline.conf.yaml", "destination configuration file") + rootCmd.PersistentFlags().StringVar(&opts.DestDocFile, "destDocFile", "/tmp/metrics.md", "destination documentation file (.md)") + rootCmd.PersistentFlags().StringVar(&opts.DestGrafanaJsonnetFolder, "destGrafanaJsonnetFolder", "/tmp/jsonnet", "destination grafana jsonnet folder") + rootCmd.PersistentFlags().StringSliceVar(&opts.SkipWithTags, "skipWithTags", nil, "Skip definitions with Tags") + rootCmd.PersistentFlags().StringSliceVar(&opts.GenerateStages, "generateStages", nil, "Produce only specified stages (ingest, transform_generic, transform_network, extract_aggregate, encode_prom, write_loki") } func main() { @@ -150,15 +151,10 @@ func run() { fmt.Printf("Starting %s:\n=====\nBuild Version: %s\nBuild Date: %s\n\n", filepath.Base(os.Args[0]), BuildVersion, BuildDate) // Dump the configuration - dumpConfig() + dumpConfig(&opts) // creating a new configuration generator - confGen, err := confgen.NewConfGen() - if err != nil { - log.Fatalf("failed to initialize NewConfGen %s", err) - os.Exit(1) - } - - err = confGen.Run() + confGen := confgen.NewConfGen(&opts) + err := confGen.Run() if err != nil { log.Fatalf("failed to initialize NewConfGen %s", err) os.Exit(1) diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index 10a3bea1b..25a054167 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -43,6 +43,7 @@ var ( logLevel string envPrefix = "FLOWLOGS-PIPILNE" defaultLogFileName = ".flowlogs-pipeline" + opts config.Options ) // rootCmd represents the root command @@ -98,8 +99,8 @@ func initLogger() { log.SetFormatter(&log.TextFormatter{DisableColors: false, FullTimestamp: true, PadLevelText: true, DisableQuote: true}) } -func dumpConfig() { - configAsJSON, _ := json.MarshalIndent(config.Opt, "", " ") +func dumpConfig(opts config.Options) { + configAsJSON, _ := json.MarshalIndent(opts, "", " ") fmt.Printf("Using configuration:\n%s\n", configAsJSON) } @@ -133,9 +134,9 @@ func initFlags() { cobra.OnInitialize(initConfig) rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName)) rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error") - rootCmd.PersistentFlags().StringVar(&config.Opt.Health.Port, "health.port", "8080", "Health server port") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine, "pipeline", "", "json of config file pipeline field") - rootCmd.PersistentFlags().StringVar(&config.Opt.Parameters, "parameters", "", "json of config file parameters field") + rootCmd.PersistentFlags().StringVar(&opts.Health.Port, "health.port", "8080", "Health server port") + rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field") + rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field") } func main() { @@ -159,9 +160,9 @@ func run() { filepath.Base(os.Args[0]), BuildVersion, BuildDate) // Dump configuration - dumpConfig() + dumpConfig(opts) - err = config.ParseConfig() + cfg, err := config.ParseConfig(opts) if err != nil { log.Errorf("error in parsing config file: %v", err) os.Exit(1) @@ -171,14 +172,14 @@ func run() { utils.SetupElegantExit() // Create new flows pipeline - mainPipeline, err = pipeline.NewPipeline() + mainPipeline, err = pipeline.NewPipeline(&cfg) if err != nil { log.Fatalf("failed to initialize pipeline %s", err) os.Exit(1) } // Start health report server - health.NewHealthServer(mainPipeline) + health.NewHealthServer(&opts, mainPipeline) // Starts the flows pipeline mainPipeline.Run() diff --git a/pkg/api/write_loki.go b/pkg/api/write_loki.go index 1cf4ddddf..2f5de6992 100644 --- a/pkg/api/write_loki.go +++ b/pkg/api/write_loki.go @@ -46,18 +46,30 @@ type WriteLoki struct { TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"` } -func GetWriteLokiDefaults() WriteLoki { - return WriteLoki{ - URL: "http://loki:3100/", - BatchWait: "1s", - BatchSize: 100 * 1024, - Timeout: "10s", - MinBackoff: "1s", - MaxBackoff: "5m", - MaxRetries: 10, - StaticLabels: model.LabelSet{}, - TimestampLabel: "TimeReceived", - TimestampScale: "1s", +func (w *WriteLoki) SetDefaults() { + if w.BatchWait == "" { + w.BatchWait = "1s" + } + if w.BatchSize == 0 { + w.BatchSize = 100 * 1024 + } + if w.Timeout == "" { + w.Timeout = "10s" + } + if w.MinBackoff == "" { + w.MinBackoff = "1s" + } + if w.MaxBackoff == "" { + w.MaxBackoff = "1s" + } + if w.MaxRetries == 0 { + w.MaxRetries = 10 + } + if w.TimestampLabel == "" { + w.TimestampLabel = "TimeReceived" + } + if w.TimestampScale == "" { + w.TimestampScale = "1s" } } diff --git a/pkg/confgen/confgen.go b/pkg/confgen/confgen.go index 7ee0d3dde..80eb60e89 100644 --- a/pkg/confgen/confgen.go +++ b/pkg/confgen/confgen.go @@ -29,7 +29,7 @@ import ( "gopkg.in/yaml.v2" ) -var ( +const ( definitionExt = ".yaml" definitionHeader = "#flp_confgen" configFileName = "config.yaml" @@ -50,6 +50,7 @@ type Definition struct { type Definitions []Definition type ConfGen struct { + opts *Options config *Config transformRules api.NetworkTransformRules aggregateDefinitions aggregate.Definitions @@ -71,26 +72,31 @@ type DefFile struct { func (cg *ConfGen) Run() error { var err error - cg.config, err = cg.ParseConfigFile(Opt.SrcFolder + "/" + configFileName) + cg.config, err = cg.ParseConfigFile(cg.opts.SrcFolder + "/" + configFileName) if err != nil { log.Debugf("cg.ParseConfigFile err: %v ", err) return err } - definitionFiles := cg.GetDefinitionFiles(Opt.SrcFolder) + definitionFiles := getDefinitionFiles(cg.opts.SrcFolder) for _, definitionFile := range definitionFiles { - err := cg.parseFile(definitionFile) + b, err := ioutil.ReadFile(definitionFile) if err != nil { - log.Debugf("cg.parseFile err: %v ", err) + log.Debugf("ioutil.ReadFile err: %v ", err) + continue + } + err = cg.ParseDefinition(definitionFile, b) + if err != nil { + log.Debugf("cg.parseDefinition err: %v ", err) continue } } - cg.Dedupe() + cg.dedupe() - if len(Opt.GenerateStages) != 0 { - config := cg.GenerateTruncatedConfig(Opt.GenerateStages) - err = cg.writeConfigFile(Opt.DestConfFile, config) + if len(cg.opts.GenerateStages) != 0 { + cfg := cg.GenerateTruncatedConfig() + err = cg.writeConfigFile(cg.opts.DestConfFile, cfg) if err != nil { log.Debugf("cg.GenerateTruncatedConfig err: %v ", err) return err @@ -98,20 +104,20 @@ func (cg *ConfGen) Run() error { return nil } else { config := cg.GenerateFlowlogs2PipelineConfig() - err = cg.writeConfigFile(Opt.DestConfFile, config) + err = cg.writeConfigFile(cg.opts.DestConfFile, config) if err != nil { log.Debugf("cg.GenerateFlowlogs2PipelineConfig err: %v ", err) return err } } - err = cg.generateDoc(Opt.DestDocFile) + err = cg.generateDoc(cg.opts.DestDocFile) if err != nil { log.Debugf("cg.generateDoc err: %v ", err) return err } - err = cg.generateGrafanaJsonnet(Opt.DestGrafanaJsonnetFolder) + err = cg.generateGrafanaJsonnet(cg.opts.DestGrafanaJsonnetFolder) if err != nil { log.Debugf("cg.generateGrafanaJsonnet err: %v ", err) return err @@ -120,62 +126,44 @@ func (cg *ConfGen) Run() error { return nil } -func (cg *ConfGen) checkHeader(fileName string) error { - // check header - f, err := os.OpenFile(fileName, os.O_RDONLY, 0644) - if err != nil { - log.Debugf("os.OpenFile error: %v ", err) - return err - } +func checkHeader(bytes []byte) error { header := make([]byte, len(definitionHeader)) - _, err = f.Read(header) - if err != nil || string(header) != definitionHeader { - log.Debugf("Wrong header file: %s ", fileName) + copy(header, bytes) + if string(header) != definitionHeader { return fmt.Errorf("wrong header") } - err = f.Close() - if err != nil { - log.Debugf("f.Close err: %v ", err) - return err - } - return nil } -func (cg *ConfGen) parseFile(fileName string) error { - +func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error { // check header - err := cg.checkHeader(fileName) + err := checkHeader(bytes) if err != nil { - log.Debugf("cg.checkHeader err: %v ", err) + log.Debugf("%s cg.checkHeader err: %v ", name, err) return err } // parse yaml var defFile DefFile - yamlFile, err := ioutil.ReadFile(fileName) + err = yaml.Unmarshal(bytes, &defFile) if err != nil { - log.Debugf("ioutil.ReadFile err: %v ", err) - return err - } - err = yaml.Unmarshal(yamlFile, &defFile) - if err != nil { - log.Debugf("yaml.Unmarshal err: %v ", err) + log.Debugf("%s yaml.Unmarshal err: %v ", name, err) return err } //skip if their skip tag match - for _, skipTag := range Opt.SkipWithTags { + for _, skipTag := range cg.opts.SkipWithTags { for _, tag := range defFile.Tags { if skipTag == tag { - return fmt.Errorf("skipping definition %s due to skip tag %s", fileName, tag) + log.Infof("skipping definition %s due to skip tag %s", name, tag) + return nil } } } // parse definition definition := Definition{ - FileName: fileName, + FileName: name, Description: defFile.Description, Details: defFile.Details, Usage: defFile.Usage, @@ -215,7 +203,7 @@ func (cg *ConfGen) parseFile(fileName string) error { return nil } -func (*ConfGen) GetDefinitionFiles(rootPath string) []string { +func getDefinitionFiles(rootPath string) []string { var files []string @@ -235,11 +223,12 @@ func (*ConfGen) GetDefinitionFiles(rootPath string) []string { return files } -func NewConfGen() (*ConfGen, error) { +func NewConfGen(opts *Options) *ConfGen { return &ConfGen{ + opts: opts, transformRules: api.NetworkTransformRules{}, aggregateDefinitions: aggregate.Definitions{}, definitions: Definitions{}, visualizations: Visualizations{}, - }, nil + } } diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index f2b416a72..adcd61d39 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -18,38 +18,89 @@ package confgen import ( + "io/ioutil" "os" "path" "path/filepath" "testing" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" ) -func getConfGen() *ConfGen { - return &ConfGen{} -} - func Test_checkHeader(t *testing.T) { - filename := "/tmp/header.check.txt" - fakeFilename := "/tmp/fake_file.does.exist" wrongHeader := "#wrong_confgen" - cg := getConfGen() - err := cg.checkHeader(fakeFilename) - require.Error(t, err) - err = os.WriteFile(filename, []byte(wrongHeader), 0644) - require.NoError(t, err) - err = cg.checkHeader(filename) + err := checkHeader([]byte(wrongHeader)) require.Error(t, err) - err = os.WriteFile(filename, []byte(definitionHeader), 0644) - require.NoError(t, err) - err = cg.checkHeader(filename) + err = checkHeader([]byte(definitionHeader)) require.NoError(t, err) } -const networkDefinitionConfiguration = `#flp_confgen +const shortConfig = `#flp_confgen +description: + test description +ingest: + collector: + port: 2155 + portLegacy: 2156 + hostName: 0.0.0.0 +encode: + type: prom + prom: + prefix: flp_ + port: 9102 +visualization: + type: grafana + grafana: + dashboards: + - name: "details" + title: "Flow-Logs to Metrics - Details" + time_from: "now-15m" + tags: "['flp','grafana','dashboard','details']" + schemaVersion: "16" +` + +const longConfig = `#flp_confgen +description: + test description +ingest: + collector: + port: 2155 + portLegacy: 2156 + hostName: 0.0.0.0 +transform: + generic: + rules: + - input: SrcAddr + output: srcIP +encode: + type: prom + prom: + prefix: flp_ + port: 9102 +write: + type: loki + loki: + url: http://loki:3100 + staticLabels: + job: flowlogs-pipeline +visualization: + type: grafana + grafana: + dashboards: + - name: "details" + title: "Flow-Logs to Metrics - Details" + time_from: "now-15m" + tags: "['flp','grafana','dashboard','details']" + schemaVersion: "16" +` + +const networkDefs = `#flp_confgen description: test description details: @@ -85,41 +136,249 @@ encode: visualization: type: grafana grafana: - - expr: 'test expression' - type: graphPanel - dashboard: test - title: - Test grafana title + - expr: 'test expression' + type: graphPanel + dashboard: test + title: + Test grafana title ` -func Test_parseFile(t *testing.T) { - fakeFilename := "/tmp/fake_file.does.exist" - filename := "/tmp/parse_file.check.txt" - cg := getConfGen() - err := cg.parseFile(fakeFilename) - require.Error(t, err) - - err = os.WriteFile(filename, []byte(networkDefinitionConfiguration), 0644) - require.NoError(t, err) - err = cg.parseFile(filename) +func Test_ParseDefinition(t *testing.T) { + cg := NewConfGen(&Options{}) + err := cg.ParseDefinition("def", []byte(networkDefs)) require.NoError(t, err) } func Test_getDefinitionFiles(t *testing.T) { - dirPath := "/tmp/getDefinitionFilesTest" filename := "/def.yaml" - cg := getConfGen() - err := os.MkdirAll(dirPath, 0755) + dirPath, err := ioutil.TempDir("", "getDefinitionFilesTest") require.NoError(t, err) - err = os.WriteFile(filepath.Join(dirPath, filename), []byte(networkDefinitionConfiguration), 0644) + defer os.RemoveAll(dirPath) + err = os.WriteFile(filepath.Join(dirPath, filename), []byte(networkDefs), 0644) require.NoError(t, err) - files := cg.GetDefinitionFiles(dirPath) + files := getDefinitionFiles(dirPath) require.Equal(t, 1, len(files)) expected := []string{path.Join(dirPath, filename)} require.ElementsMatch(t, expected, files) } -func Test_NewConfGen(t *testing.T) { - _, err := NewConfGen() +func Test_RunShortConfGen(t *testing.T) { + // Prepare + dirPath, err := ioutil.TempDir("", "RunShortConfGenTest") + require.NoError(t, err) + defer os.RemoveAll(dirPath) + outDirPath, err := ioutil.TempDir("", "RunShortConfGenTest_out") + require.NoError(t, err) + defer os.RemoveAll(outDirPath) + + configOut := filepath.Join(outDirPath, "config.yaml") + docOut := filepath.Join(outDirPath, "doc.md") + jsonnetOut := filepath.Join(outDirPath, "jsonnet") + + cg := NewConfGen(&Options{ + SrcFolder: dirPath, + DestConfFile: configOut, + DestDocFile: docOut, + DestGrafanaJsonnetFolder: jsonnetOut, + }) + err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(shortConfig), 0644) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(dirPath, "def.yaml"), []byte(networkDefs), 0644) + require.NoError(t, err) + + // Run + err = cg.Run() + require.NoError(t, err) + + // Unmarshal output + destCfgBytes, err := ioutil.ReadFile(configOut) + require.NoError(t, err) + var out config.ConfigFileStruct + err = yaml.Unmarshal(destCfgBytes, &out) + require.NoError(t, err) + require.Len(t, out.Pipeline, 4) + require.Len(t, out.Parameters, 4) + + // Pipeline structure + require.Equal(t, + []config.Stage( + []config.Stage{{Name: "ingest_collector"}, + {Name: "transform_network", Follows: "ingest_collector"}, + {Name: "extract_aggregate", Follows: "transform_network"}, + {Name: "encode_prom", Follows: "extract_aggregate"}}), + out.Pipeline, + ) + + // Expects ingest + require.Equal(t, &api.IngestCollector{ + HostName: "0.0.0.0", + Port: 2155, + PortLegacy: 2156, + }, out.Parameters[0].Ingest.Collector) + + // Expects transform network + require.Len(t, out.Parameters[1].Transform.Network.Rules, 1) + require.Equal(t, api.NetworkTransformRule{ + Input: "testInput", + Output: "testOutput", + Type: "add_service", + Parameters: "proto", + }, out.Parameters[1].Transform.Network.Rules[0]) + + // Expects aggregates + require.Len(t, out.Parameters[2].Extract.Aggregates, 1) + require.Equal(t, api.AggregateDefinition{ + Name: "test_aggregates", + By: api.AggregateBy{"service"}, + Operation: "sum", + RecordKey: "test_record_key", + }, out.Parameters[2].Extract.Aggregates[0]) + + // Expects prom encode + require.Len(t, out.Parameters[3].Encode.Prom.Metrics, 1) + require.Equal(t, &api.PromEncode{ + Port: 9102, + Prefix: "flp_", + Metrics: api.PromMetricsItems{{ + Name: "test_metric", + Type: "gauge", + Filter: api.PromMetricsFilter{Key: "", Value: ""}, + ValueKey: "test_aggregates_value", + Labels: []string{"by", "aggregate"}, + Buckets: []float64{}, + }}, + }, out.Parameters[3].Encode.Prom) +} + +func Test_RunLongConfGen(t *testing.T) { + // Prepare + dirPath, err := ioutil.TempDir("", "RunLongConfGenTest") require.NoError(t, err) + defer os.RemoveAll(dirPath) + outDirPath, err := ioutil.TempDir("", "RunLongConfGenTest_out") + require.NoError(t, err) + defer os.RemoveAll(outDirPath) + + configOut := filepath.Join(outDirPath, "config.yaml") + docOut := filepath.Join(outDirPath, "doc.md") + jsonnetOut := filepath.Join(outDirPath, "jsonnet") + + cg := NewConfGen(&Options{ + SrcFolder: dirPath, + DestConfFile: configOut, + DestDocFile: docOut, + DestGrafanaJsonnetFolder: jsonnetOut, + }) + err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(longConfig), 0644) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(dirPath, "def.yaml"), []byte(networkDefs), 0644) + require.NoError(t, err) + + // Run + err = cg.Run() + require.NoError(t, err) + + // Unmarshal output + destCfgBytes, err := ioutil.ReadFile(configOut) + require.NoError(t, err) + var out config.ConfigFileStruct + err = yaml.Unmarshal(destCfgBytes, &out) + require.NoError(t, err) + require.Len(t, out.Parameters, 6) + require.Len(t, out.Pipeline, 6) + + // Pipeline structure + require.Equal(t, + []config.Stage( + []config.Stage{{Name: "ingest_collector"}, + {Name: "transform_generic", Follows: "ingest_collector"}, + {Name: "transform_network", Follows: "transform_generic"}, + {Name: "extract_aggregate", Follows: "transform_network"}, + {Name: "encode_prom", Follows: "extract_aggregate"}, + {Name: "write_loki", Follows: "transform_network"}}), + out.Pipeline, + ) + + // Expects ingest + require.Equal(t, &api.IngestCollector{ + HostName: "0.0.0.0", + Port: 2155, + PortLegacy: 2156, + }, out.Parameters[0].Ingest.Collector) + + // Expects transform generic + require.Equal(t, "replace_keys", out.Parameters[1].Transform.Generic.Policy) + + // Expects transform network + require.Len(t, out.Parameters[2].Transform.Network.Rules, 1) + require.Equal(t, api.NetworkTransformRule{ + Input: "testInput", + Output: "testOutput", + Type: "add_service", + Parameters: "proto", + }, out.Parameters[2].Transform.Network.Rules[0]) + + // Expects aggregates + require.Len(t, out.Parameters[3].Extract.Aggregates, 1) + require.Equal(t, api.AggregateDefinition{ + Name: "test_aggregates", + By: api.AggregateBy{"service"}, + Operation: "sum", + RecordKey: "test_record_key", + }, out.Parameters[3].Extract.Aggregates[0]) + + // Expects prom encode + require.Len(t, out.Parameters[4].Encode.Prom.Metrics, 1) + require.Equal(t, &api.PromEncode{ + Port: 9102, + Prefix: "flp_", + Metrics: api.PromMetricsItems{{ + Name: "test_metric", + Type: "gauge", + Filter: api.PromMetricsFilter{Key: "", Value: ""}, + ValueKey: "test_aggregates_value", + Labels: []string{"by", "aggregate"}, + Buckets: []float64{}, + }}, + }, out.Parameters[4].Encode.Prom) + + // Expects loki + require.Equal(t, &api.WriteLoki{ + URL: "http://loki:3100", + StaticLabels: model.LabelSet{"job": "flowlogs-pipeline"}, + }, out.Parameters[5].Write.Loki) +} + +func Test_GenerateTruncatedConfig(t *testing.T) { + // Prepare + cg := NewConfGen(&Options{ + GenerateStages: []string{"extract_aggregate", "encode_prom"}, + }) + err := cg.ParseDefinition("defs", []byte(networkDefs)) + require.NoError(t, err) + + // Run + params := cg.GenerateTruncatedConfig() + + require.Len(t, params, 2) + // Expects aggregates + require.Len(t, params[0].Extract.Aggregates, 1) + require.Equal(t, api.AggregateDefinition{ + Name: "test_aggregates", + By: api.AggregateBy{"service"}, + Operation: "sum", + RecordKey: "test_record_key", + }, params[0].Extract.Aggregates[0]) + + // Expects prom encode + require.Len(t, params[1].Encode.Prom.Metrics, 1) + require.Equal(t, &api.PromEncode{ + Metrics: api.PromMetricsItems{{ + Name: "test_metric", + Type: "gauge", + Filter: api.PromMetricsFilter{Key: "", Value: ""}, + ValueKey: "test_aggregates_value", + Labels: []string{"by", "aggregate"}, + }}, + }, params[1].Encode.Prom) } diff --git a/pkg/confgen/config.go b/pkg/confgen/config.go index a4743e279..5d9d012c4 100644 --- a/pkg/confgen/config.go +++ b/pkg/confgen/config.go @@ -21,7 +21,7 @@ import ( "io/ioutil" "os" - "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" @@ -36,37 +36,16 @@ type Options struct { GenerateStages []string } -var ( - Opt = Options{} -) - -type ConfigIngest struct { - Collector api.IngestCollector `yaml:"collector"` -} - -type ConfigTransform struct { - Generic api.TransformGeneric `yaml:"generic"` -} - -type ConfigEncode struct { - Prom api.PromEncode `yaml:"prom"` -} - -type ConfigWrite struct { - Loki api.WriteLoki `yaml:"loki"` - Type string `yaml:"type"` -} - type ConfigVisualization struct { Grafana ConfigVisualizationGrafana `yaml:"grafana"` } type Config struct { Description string `yaml:"description"` - Ingest ConfigIngest `yaml:"ingest"` - Transform ConfigTransform `yaml:"transform"` - Write ConfigWrite `yaml:"write"` - Encode ConfigEncode `yaml:"encode"` + Ingest config.Ingest `yaml:"ingest"` + Transform config.Transform `yaml:"transform"` + Write config.Write `yaml:"write"` + Encode config.Encode `yaml:"encode"` Visualization ConfigVisualization `yaml:"visualization"` } @@ -75,7 +54,7 @@ func (cg *ConfGen) ParseConfigFile(fileName string) (*Config, error) { // provide a minimal config for when config file is missing (as for Netobserv Openshift Operator) var config Config if _, err := os.Stat(fileName); errors.Is(err, os.ErrNotExist) { - if len(Opt.GenerateStages) == 0 { + if len(cg.opts.GenerateStages) == 0 { log.Errorf("config file %s does not exist", fileName) return nil, err } diff --git a/pkg/confgen/config_test.go b/pkg/confgen/config_test.go index 2b3937f26..78781a9e6 100644 --- a/pkg/confgen/config_test.go +++ b/pkg/confgen/config_test.go @@ -18,24 +18,26 @@ package confgen import ( + "io/ioutil" "os" "testing" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/stretchr/testify/require" ) func expectedConfig() *Config { return &Config{ Description: "test description", - Encode: ConfigEncode{ - Prom: api.PromEncode{ + Encode: config.Encode{ + Prom: &api.PromEncode{ Port: 7777, Prefix: "prefix", }, }, - Ingest: ConfigIngest{ - Collector: api.IngestCollector{ + Ingest: config.Ingest{ + Collector: &api.IngestCollector{ Port: 8888, }, }, @@ -58,11 +60,16 @@ encode: ` func Test_parseConfigFile(t *testing.T) { - filename := "/tmp/config" - cg := getConfGen() - err := os.WriteFile(filename, []byte(testConfig), 0644) - require.Equal(t, err, nil) - config, err := cg.ParseConfigFile(filename) + file, err := ioutil.TempFile("", "config.yaml") + require.NoError(t, err) + defer os.Remove(file.Name()) + cg := NewConfGen(&Options{}) + _, err = file.Write([]byte(testConfig)) + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + + config, err := cg.ParseConfigFile(file.Name()) require.NoError(t, err) require.Equal(t, config, expectedConfig()) } diff --git a/pkg/confgen/dedup.go b/pkg/confgen/dedup.go index 0255468eb..e09d19953 100644 --- a/pkg/confgen/dedup.go +++ b/pkg/confgen/dedup.go @@ -25,7 +25,7 @@ import ( log "github.com/sirupsen/logrus" ) -func (cg *ConfGen) Dedupe() { +func (cg *ConfGen) dedupe() { cg.transformRules = dedupeNetworkTransformRules(cg.transformRules) cg.aggregateDefinitions = dedupeAggregateDefinitions(cg.aggregateDefinitions) } diff --git a/pkg/confgen/doc.go b/pkg/confgen/doc.go index a3b03fd51..dd4366e93 100644 --- a/pkg/confgen/doc.go +++ b/pkg/confgen/doc.go @@ -109,7 +109,7 @@ and the transformation to generate the exported metric. - `, Opt.SrcFolder) + `, cg.opts.SrcFolder) data := fmt.Sprintf("%s\n%s\n", header, doc) err := ioutil.WriteFile(fileName, []byte(data), 0664) if err != nil { diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 4f9fe3f1a..6501fb6ad 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -22,163 +22,70 @@ import ( "io/ioutil" "github.com/netobserv/flowlogs-pipeline/pkg/api" - config "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) -func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config.ConfigFileStruct { - configStruct := config.ConfigFileStruct{ - LogLevel: "error", - Pipeline: []config.Stage{ - {Name: "ingest_collector"}, - {Name: "transform_generic", - Follows: "ingest_collector", - }, - {Name: "transform_network", - Follows: "transform_generic", - }, - {Name: "extract_aggregate", - Follows: "transform_network", - }, - {Name: "encode_prom", - Follows: "extract_aggregate", - }, - {Name: "write_loki", - Follows: "transform_network", - }, - }, - Parameters: []config.StageParam{ - {Name: "ingest_collector", - Ingest: &config.Ingest{ - Type: "collector", - Collector: &api.IngestCollector{ - Port: cg.config.Ingest.Collector.Port, - PortLegacy: cg.config.Ingest.Collector.PortLegacy, - HostName: cg.config.Ingest.Collector.HostName, - }, - }, - }, - {Name: "transform_generic", - Transform: &config.Transform{ - Type: "generic", - Generic: &api.TransformGeneric{ - Policy: "replace_keys", - Rules: cg.config.Transform.Generic.Rules, - }, - }, - }, - {Name: "transform_network", - Transform: &config.Transform{ - Type: "network", - Network: &api.TransformNetwork{ - Rules: cg.transformRules, - }, - }, - }, - {Name: "extract_aggregate", - Extract: &config.Extract{ - Type: "aggregates", - Aggregates: cg.aggregateDefinitions, - }, - }, - {Name: "encode_prom", - Encode: &config.Encode{ - Type: "prom", - Prom: &api.PromEncode{ - Port: cg.config.Encode.Prom.Port, - Prefix: cg.config.Encode.Prom.Prefix, - Metrics: cg.promMetrics, - }, - }, - }, - {Name: "write_loki", - Write: &config.Write{ - Type: cg.config.Write.Type, - Loki: &cg.config.Write.Loki, - }, - }, - }, +func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { + pipeline, _ := config.NewPipeline("ingest_collector", &cg.config.Ingest) + next := pipeline + if cg.config.Transform.Generic != nil { + gen := *cg.config.Transform.Generic + if len(gen.Policy) == 0 { + gen.Policy = "replace_keys" + } + next = next.TransformGeneric("transform_generic", gen) + } + if len(cg.transformRules) > 0 { + next = next.TransformNetwork("transform_network", api.TransformNetwork{ + Rules: cg.transformRules, + }) + } + if len(cg.aggregateDefinitions) > 0 { + agg := next.Aggregate("extract_aggregate", cg.aggregateDefinitions) + agg.EncodePrometheus("encode_prom", api.PromEncode{ + Port: cg.config.Encode.Prom.Port, + Prefix: cg.config.Encode.Prom.Prefix, + Metrics: cg.promMetrics, + }) + } + if cg.config.Write.Loki != nil { + next.WriteLoki("write_loki", *cg.config.Write.Loki) + } + return &config.ConfigFileStruct{ + LogLevel: "error", + Pipeline: pipeline.GetStages(), + Parameters: pipeline.GetStageParams(), } - return configStruct } -func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config.ConfigFileStruct { - parameters := make([]config.StageParam, len(stages)) - for i, stage := range stages { +func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam { + parameters := make([]config.StageParam, len(cg.opts.GenerateStages)) + for i, stage := range cg.opts.GenerateStages { switch stage { case "ingest": - parameters[i] = config.StageParam{ - Name: "ingest_collector", - Ingest: &config.Ingest{ - Type: "collector", - Collector: &api.IngestCollector{ - Port: cg.config.Ingest.Collector.Port, - PortLegacy: cg.config.Ingest.Collector.PortLegacy, - HostName: cg.config.Ingest.Collector.HostName, - }, - }, - } + parameters[i] = config.NewCollectorParams("ingest_collector", *cg.config.Ingest.Collector) case "transform_generic": - parameters[i] = config.StageParam{ - Name: "transform_generic", - Transform: &config.Transform{ - Type: "generic", - Generic: &api.TransformGeneric{ - Policy: "replace_keys", - Rules: cg.config.Transform.Generic.Rules, - }, - }, - } + parameters[i] = config.NewTransformGenericParams("transform_generic", *cg.config.Transform.Generic) case "transform_network": - parameters[i] = config.StageParam{ - Name: "transform_network", - Transform: &config.Transform{ - Type: "network", - Network: &api.TransformNetwork{ - Rules: cg.transformRules, - }, - }, - } + parameters[i] = config.NewTransformNetworkParams("transform_network", *cg.config.Transform.Network) case "extract_aggregate": - parameters[i] = config.StageParam{ - Name: "extract_aggregate", - Extract: &config.Extract{ - Type: "aggregates", - Aggregates: cg.aggregateDefinitions, - }, - } + parameters[i] = config.NewAggregateParams("extract_aggregate", cg.aggregateDefinitions) case "encode_prom": - parameters[i] = config.StageParam{ - Name: "encode_prom", - Encode: &config.Encode{ - Type: "prom", - Prom: &api.PromEncode{ - Port: cg.config.Encode.Prom.Port, - Prefix: cg.config.Encode.Prom.Prefix, - Metrics: cg.promMetrics, - }, - }, - } + parameters[i] = config.NewEncodePrometheusParams("encode_prom", api.PromEncode{ + Metrics: cg.promMetrics, + }) case "write_loki": - parameters[i] = config.StageParam{ - Name: "write_loki", - Write: &config.Write{ - Type: cg.config.Write.Type, - Loki: &cg.config.Write.Loki, - }, - } + parameters[i] = config.NewWriteLokiParams("write_loki", *cg.config.Write.Loki) } } log.Debugf("parameters = %v \n", parameters) - configStruct := config.ConfigFileStruct{ - Parameters: parameters, - } - return configStruct + return parameters } -func (cg *ConfGen) writeConfigFile(fileName string, config config.ConfigFileStruct) error { - configData, err := yaml.Marshal(&config) +func (cg *ConfGen) writeConfigFile(fileName string, cfg interface{}) error { + configData, err := yaml.Marshal(cfg) if err != nil { return err } diff --git a/pkg/confgen/grafana_jsonnet.go b/pkg/confgen/grafana_jsonnet.go index d7d4ef071..5871422f6 100644 --- a/pkg/confgen/grafana_jsonnet.go +++ b/pkg/confgen/grafana_jsonnet.go @@ -20,6 +20,7 @@ package confgen import ( "bytes" "os" + "path/filepath" "text/template" log "github.com/sirupsen/logrus" @@ -165,7 +166,6 @@ type Dashboard struct { } func (cg *ConfGen) generateGrafanaJsonnet(folderName string) error { - // generate dashboards dashboards, err := cg.generateGrafanaJsonnetDashboards() if err != nil { @@ -180,13 +180,19 @@ func (cg *ConfGen) generateGrafanaJsonnet(folderName string) error { return err } + err = os.MkdirAll(folderName, 0755) + if err != nil { + log.Debugf("os.MkdirAll err: %v ", err) + return err + } + // write to destination files for _, dashboard := range dashboards { output := []byte(jsonNetHeaderTemplate) output = append(output, dashboard.Header...) output = append(output, dashboard.Panels...) - fileName := folderName + "dashboard_" + dashboard.Name + ".jsonnet" + fileName := filepath.Join(folderName, "dashboard_"+dashboard.Name+".jsonnet") err = os.WriteFile(fileName, output, 0644) if err != nil { log.Debugf("os.WriteFile to file %s err: %v ", fileName, err) diff --git a/pkg/config/config.go b/pkg/config/config.go index 3c26d9f12..85f5ad12e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -24,28 +24,22 @@ import ( "github.com/sirupsen/logrus" ) -var ( - Opt = Options{} - PipeLine []Stage - Parameters []StageParam -) - type Options struct { PipeLine string Parameters string Health Health } -type Health struct { - Port string -} - type ConfigFileStruct struct { LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"` Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"` Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` } +type Health struct { + Port string +} + type Stage struct { Name string `yaml:"name" json:"name"` Follows string `yaml:"follows,omitempty" json:"follows,omitempty"` @@ -101,20 +95,22 @@ type Write struct { } // ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json -func ParseConfig() error { - logrus.Debugf("config.Opt.PipeLine = %v ", Opt.PipeLine) - err := json.Unmarshal([]byte(Opt.PipeLine), &PipeLine) +func ParseConfig(opts Options) (ConfigFileStruct, error) { + out := ConfigFileStruct{} + + logrus.Debugf("opts.PipeLine = %v ", opts.PipeLine) + err := json.Unmarshal([]byte(opts.PipeLine), &out.Pipeline) if err != nil { logrus.Errorf("error when reading config file: %v", err) - return err + return out, err } - logrus.Debugf("stages = %v ", PipeLine) + logrus.Debugf("stages = %v ", out.Pipeline) - err = json.Unmarshal([]byte(Opt.Parameters), &Parameters) + err = json.Unmarshal([]byte(opts.Parameters), &out.Parameters) if err != nil { logrus.Errorf("error when reading config file: %v", err) - return err + return out, err } - logrus.Debugf("params = %v ", Parameters) - return nil + logrus.Debugf("params = %v ", out.Parameters) + return out, nil } diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index 35430cd44..c39707136 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -18,6 +18,8 @@ package config import ( + "errors" + "github.com/netobserv/flowlogs-pipeline/pkg/api" ) @@ -45,11 +47,25 @@ type PipelineBuilderStage struct { pipeline *pipeline } +// NewPipeline creates a new pipeline from an existing ingest +func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) { + if ingest.Collector != nil { + return NewCollectorPipeline(name, *ingest.Collector), nil + } + if ingest.GRPC != nil { + return NewGRPCPipeline(name, *ingest.GRPC), nil + } + if ingest.Kafka != nil { + return NewKafkaPipeline(name, *ingest.Kafka), nil + } + return PipelineBuilderStage{}, errors.New("Missing ingest params") +} + // NewCollectorPipeline creates a new pipeline from an `IngestCollector` initial stage (listening for NetFlows / IPFIX) func NewCollectorPipeline(name string, ingest api.IngestCollector) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.CollectorType, Collector: &ingest}}}, + config: []StageParam{NewCollectorParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -58,7 +74,7 @@ func NewCollectorPipeline(name string, ingest api.IngestCollector) PipelineBuild func NewGRPCPipeline(name string, ingest api.IngestGRPCProto) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.GRPCType, GRPC: &ingest}}}, + config: []StageParam{NewGRPCParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -67,7 +83,7 @@ func NewGRPCPipeline(name string, ingest api.IngestGRPCProto) PipelineBuilderSta func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.KafkaType, Kafka: &ingest}}}, + config: []StageParam{NewKafkaParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -80,47 +96,47 @@ func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuild // Aggregate chains the current stage with an aggregate stage and returns that new stage func (b *PipelineBuilderStage) Aggregate(name string, aggs []api.AggregateDefinition) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: aggs}}) + return b.next(name, NewAggregateParams(name, aggs)) } // TransformGeneric chains the current stage with a TransformGeneric stage and returns that new stage func (b *PipelineBuilderStage) TransformGeneric(name string, gen api.TransformGeneric) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.GenericType, Generic: &gen}}) + return b.next(name, NewTransformGenericParams(name, gen)) } // TransformFilter chains the current stage with a TransformFilter stage and returns that new stage func (b *PipelineBuilderStage) TransformFilter(name string, filter api.TransformFilter) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.FilterType, Filter: &filter}}) + return b.next(name, NewTransformFilterParams(name, filter)) } // TransformNetwork chains the current stage with a TransformNetwork stage and returns that new stage func (b *PipelineBuilderStage) TransformNetwork(name string, nw api.TransformNetwork) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}}) + return b.next(name, NewTransformNetworkParams(name, nw)) } // ConnTrack chains the current stage with a ConnTrack stage and returns that new stage func (b *PipelineBuilderStage) ConnTrack(name string, ct api.ConnTrack) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}}) + return b.next(name, NewConnTrackParams(name, ct)) } // EncodePrometheus chains the current stage with a PromEncode stage (to expose metrics in Prometheus format) and returns that new stage func (b *PipelineBuilderStage) EncodePrometheus(name string, prom api.PromEncode) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Encode: &Encode{Type: api.PromType, Prom: &prom}}) + return b.next(name, NewEncodePrometheusParams(name, prom)) } // EncodeKafka chains the current stage with an EncodeKafka stage (writing to a Kafka topic) and returns that new stage func (b *PipelineBuilderStage) EncodeKafka(name string, kafka api.EncodeKafka) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Encode: &Encode{Type: api.KafkaType, Kafka: &kafka}}) + return b.next(name, NewEncodeKafkaParams(name, kafka)) } // WriteStdout chains the current stage with a WriteStdout stage and returns that new stage func (b *PipelineBuilderStage) WriteStdout(name string, stdout api.WriteStdout) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Write: &Write{Type: api.StdoutType, Stdout: &stdout}}) + return b.next(name, NewWriteStdoutParams(name, stdout)) } // WriteLoki chains the current stage with a WriteLoki stage and returns that new stage func (b *PipelineBuilderStage) WriteLoki(name string, loki api.WriteLoki) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Write: &Write{Type: api.LokiType, Loki: &loki}}) + return b.next(name, NewWriteLokiParams(name, loki)) } // GetStages returns the current pipeline stages. It can be called from any of the stages, they share the same pipeline reference. diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index d5ed5a832..0ae96ddc6 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -36,7 +36,7 @@ func TestLokiPipeline(t *testing.T) { Input: "DstAddr", Output: "DstK8S", }}}) - pl = pl.WriteLoki("loki", api.GetWriteLokiDefaults()) + pl = pl.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"}) stages := pl.GetStages() require.Len(t, stages, 3) @@ -57,7 +57,7 @@ func TestLokiPipeline(t *testing.T) { b, err = json.Marshal(params[2]) require.NoError(t, err) - require.Equal(t, `{"name":"loki","write":{"type":"loki","loki":{"url":"http://loki:3100/","batchWait":"1s","batchSize":102400,"timeout":"10s","minBackoff":"1s","maxBackoff":"5m","maxRetries":10,"timestampLabel":"TimeReceived","timestampScale":"1s"}}}`, string(b)) + require.Equal(t, `{"name":"loki","write":{"type":"loki","loki":{"url":"http://loki:3100/"}}}`, string(b)) } func TestGRPCPipeline(t *testing.T) { @@ -165,7 +165,7 @@ func TestKafkaPromPipeline(t *testing.T) { func TestForkPipeline(t *testing.T) { plFork := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) - plFork.WriteLoki("loki", api.GetWriteLokiDefaults()) + plFork.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"}) plFork.WriteStdout("stdout", api.WriteStdout{}) stages := plFork.GetStages() require.Len(t, stages, 3) @@ -183,7 +183,7 @@ func TestForkPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.Equal(t, `{"name":"loki","write":{"type":"loki","loki":{"url":"http://loki:3100/","batchWait":"1s","batchSize":102400,"timeout":"10s","minBackoff":"1s","maxBackoff":"5m","maxRetries":10,"timestampLabel":"TimeReceived","timestampScale":"1s"}}}`, string(b)) + require.Equal(t, `{"name":"loki","write":{"type":"loki","loki":{"url":"http://loki:3100/"}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) diff --git a/pkg/config/stage_params.go b/pkg/config/stage_params.go new file mode 100644 index 000000000..99e0ea85b --- /dev/null +++ b/pkg/config/stage_params.go @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2021 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/api" +) + +func NewCollectorParams(name string, ingest api.IngestCollector) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.CollectorType, Collector: &ingest}} +} + +func NewGRPCParams(name string, ingest api.IngestGRPCProto) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.GRPCType, GRPC: &ingest}} +} + +func NewKafkaParams(name string, ingest api.IngestKafka) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.KafkaType, Kafka: &ingest}} +} + +func NewAggregateParams(name string, aggs []api.AggregateDefinition) StageParam { + return StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: aggs}} +} + +func NewTransformGenericParams(name string, gen api.TransformGeneric) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.GenericType, Generic: &gen}} +} + +func NewTransformFilterParams(name string, filter api.TransformFilter) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.FilterType, Filter: &filter}} +} + +func NewTransformNetworkParams(name string, nw api.TransformNetwork) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}} +} + +func NewConnTrackParams(name string, ct api.ConnTrack) StageParam { + return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}} +} + +func NewEncodePrometheusParams(name string, prom api.PromEncode) StageParam { + return StageParam{Name: name, Encode: &Encode{Type: api.PromType, Prom: &prom}} +} + +func NewEncodeKafkaParams(name string, kafka api.EncodeKafka) StageParam { + return StageParam{Name: name, Encode: &Encode{Type: api.KafkaType, Kafka: &kafka}} +} + +func NewWriteStdoutParams(name string, stdout api.WriteStdout) StageParam { + return StageParam{Name: name, Write: &Write{Type: api.StdoutType, Stdout: &stdout}} +} + +func NewWriteLokiParams(name string, loki api.WriteLoki) StageParam { + return StageParam{Name: name, Write: &Write{Type: api.LokiType, Loki: &loki}} +} diff --git a/pkg/operational/health/health.go b/pkg/operational/health/health.go index 145eb5022..9be24360f 100644 --- a/pkg/operational/health/health.go +++ b/pkg/operational/health/health.go @@ -43,10 +43,10 @@ func (hs *Server) Serve() { } } -func NewHealthServer(pipeline *pipeline.Pipeline) *Server { +func NewHealthServer(opts *config.Options, pipeline *pipeline.Pipeline) *Server { handler := healthcheck.NewHandler() - address := net.JoinHostPort(defaultServerHost, config.Opt.Health.Port) + address := net.JoinHostPort(defaultServerHost, opts.Health.Port) handler.AddLivenessCheck("PipelineCheck", pipeline.IsAlive()) handler.AddReadinessCheck("PipelineCheck", pipeline.IsReady()) diff --git a/pkg/operational/health/health_test.go b/pkg/operational/health/health_test.go index 660676735..df9d1edf5 100644 --- a/pkg/operational/health/health_test.go +++ b/pkg/operational/health/health_test.go @@ -36,9 +36,9 @@ func TestNewHealthServer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - config.Opt.Health.Port = tt.args.port - expectedAddr := fmt.Sprintf("0.0.0.0:%s", config.Opt.Health.Port) - server := NewHealthServer(&tt.args.pipeline) + opts := config.Options{Health: config.Health{Port: tt.args.port}} + expectedAddr := fmt.Sprintf("0.0.0.0:%s", opts.Health.Port) + server := NewHealthServer(&opts, &tt.args.pipeline) require.NotNil(t, server) require.Equal(t, expectedAddr, server.address) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index e02662608..1f839a06d 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -99,13 +99,13 @@ parameters: ` var err error - v := test.InitConfig(t, yamlConfig) + v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - extractAggregate, err := extract.NewExtractAggregate(config.Parameters[0]) + extractAggregate, err := extract.NewExtractAggregate(cfg.Parameters[0]) require.NoError(t, err) - promEncode, err := encode.NewEncodeProm(config.Parameters[1]) + promEncode, err := encode.NewEncodeProm(cfg.Parameters[1]) require.Equal(t, err, nil) // Test cases diff --git a/pkg/pipeline/conntrack_integ_test.go b/pkg/pipeline/conntrack_integ_test.go index e116d95d2..072f7bd68 100644 --- a/pkg/pipeline/conntrack_integ_test.go +++ b/pkg/pipeline/conntrack_integ_test.go @@ -93,10 +93,10 @@ func TestConnTrack(t *testing.T) { // connection record with specific values was written. var mainPipeline *Pipeline var err error - v := test.InitConfig(t, testConfigConntrack) + v, cfg := test.InitConfig(t, testConfigConntrack) require.NotNil(t, v) - mainPipeline, err = NewPipeline() + mainPipeline, err = NewPipeline(cfg) require.NoError(t, err) go mainPipeline.Run() diff --git a/pkg/pipeline/encode/encode_kafka_test.go b/pkg/pipeline/encode/encode_kafka_test.go index e0af22c1a..215642ee2 100644 --- a/pkg/pipeline/encode/encode_kafka_test.go +++ b/pkg/pipeline/encode/encode_kafka_test.go @@ -55,10 +55,10 @@ func (f *fakeKafkaWriter) WriteMessages(ctx context.Context, msg ...kafkago.Mess } func initNewEncodeKafka(t *testing.T) Encoder { - v := test.InitConfig(t, testKafkaConfig) + v, cfg := test.InitConfig(t, testKafkaConfig) require.NotNil(t, v) - newEncode, err := NewEncodeKafka(config.Parameters[0]) + newEncode, err := NewEncodeKafka(cfg.Parameters[0]) require.NoError(t, err) return newEncode } diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 6b484dabc..04365a0fb 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -65,10 +65,10 @@ parameters: ` func initNewEncodeProm(t *testing.T) Encoder { - v := test.InitConfig(t, testConfig) + v, cfg := test.InitConfig(t, testConfig) require.NotNil(t, v) - newEncode, err := NewEncodeProm(config.Parameters[0]) + newEncode, err := NewEncodeProm(cfg.Parameters[0]) require.Equal(t, err, nil) return newEncode } diff --git a/pkg/pipeline/extract/aggregate/aggregates_test.go b/pkg/pipeline/extract/aggregate/aggregates_test.go index 382767354..43c3d6c52 100644 --- a/pkg/pipeline/extract/aggregate/aggregates_test.go +++ b/pkg/pipeline/extract/aggregate/aggregates_test.go @@ -43,9 +43,9 @@ parameters: Operation: "avg" RecordKey: "value" ` - v := test.InitConfig(t, yamlConfig) + v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates) + aggregates, err := NewAggregatesFromConfig(cfg.Parameters[0].Extract.Aggregates) require.NoError(t, err) return aggregates diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 17c030b58..da6a4eae6 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -77,10 +77,10 @@ parameters: ` var err error - v := test.InitConfig(t, yamlConfig) + v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - extractAggregate, err := NewExtractAggregate(config.Parameters[0]) + extractAggregate, err := NewExtractAggregate(cfg.Parameters[0]) require.NoError(t, err) // Test cases diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index edea01198..cc42d49c8 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -70,10 +70,10 @@ parameters: ` func initNewIngestKafka(t *testing.T, configTemplate string) Ingester { - v := test.InitConfig(t, configTemplate) + v, cfg := test.InitConfig(t, configTemplate) require.NotNil(t, v) - newIngest, err := NewIngestKafka(config.Parameters[0]) + newIngest, err := NewIngestKafka(cfg.Parameters[0]) require.NoError(t, err) return newIngest } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index ec8202631..c26dbfb85 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -46,12 +46,12 @@ type Pipeline struct { } // NewPipeline defines the pipeline elements -func NewPipeline() (*Pipeline, error) { +func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) { log.Debugf("entering NewPipeline") - stages := config.PipeLine + stages := cfg.Pipeline log.Debugf("stages = %v ", stages) - configParams := config.Parameters + configParams := cfg.Parameters log.Debugf("configParams = %v ", configParams) build := newBuilder(configParams, stages) diff --git a/pkg/pipeline/pipeline_builder_test.go b/pkg/pipeline/pipeline_builder_test.go index ef984af1b..feb37af37 100644 --- a/pkg/pipeline/pipeline_builder_test.go +++ b/pkg/pipeline/pipeline_builder_test.go @@ -22,10 +22,10 @@ const baseConfig = `parameters: ` func TestConnectionVerification_Pass(t *testing.T) { - test.InitConfig(t, baseConfig+`pipeline: + _, cfg := test.InitConfig(t, baseConfig+`pipeline: - { follows: ingest1, name: write1 } `) - _, err := NewPipeline() + _, err := NewPipeline(cfg) assert.NoError(t, err) } @@ -89,8 +89,8 @@ pipeline: failingNodeName: "write1", }} { t.Run(tc.description, func(t *testing.T) { - test.InitConfig(t, tc.config) - _, err := NewPipeline() + _, cfg := test.InitConfig(t, tc.config) + _, err := NewPipeline(cfg) require.Error(t, err) require.IsType(t, &Error{}, err, err.Error()) assert.Equal(t, tc.failingNodeName, err.(*Error).StageName, err.Error()) diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 6b58a26df..7ab3cfdf2 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -49,7 +49,7 @@ parameters: - name: write1 write: type: loki - loki: + loki: { url: 'http://loki:3100/' } ` func Test_transformToLoki(t *testing.T) { @@ -59,9 +59,9 @@ func Test_transformToLoki(t *testing.T) { require.NoError(t, err) transformed = append(transformed, transform.Transform(input)...) - v := test.InitConfig(t, yamlConfigNoParams) + v, cfg := test.InitConfig(t, yamlConfigNoParams) require.NotNil(t, v) - loki, err := write.NewWriteLoki(config.Parameters[0]) + loki, err := write.NewWriteLoki(cfg.Parameters[0]) require.NoError(t, err) loki.Write(transformed) } @@ -108,9 +108,9 @@ parameters: func Test_SimplePipeline(t *testing.T) { var mainPipeline *Pipeline var err error - test.InitConfig(t, configTemplate) + _, cfg := test.InitConfig(t, configTemplate) - mainPipeline, err = NewPipeline() + mainPipeline, err = NewPipeline(cfg) require.NoError(t, err) // The file ingester reads the entire file, pushes it down the pipeline, and then exits @@ -135,7 +135,7 @@ func Test_SimplePipeline(t *testing.T) { func TestGRPCProtobuf(t *testing.T) { port, err := test2.FreeTCPPort() require.NoError(t, err) - test.InitConfig(t, fmt.Sprintf(`--- + _, cfg := test.InitConfig(t, fmt.Sprintf(`--- log-level: debug pipeline: - name: ingest1 @@ -154,7 +154,7 @@ parameters: format: json `, port)) - pipe, err := NewPipeline() + pipe, err := NewPipeline(cfg) require.NoError(t, err) capturedOut, w, _ := os.Pipe() @@ -234,13 +234,13 @@ parameters: func BenchmarkPipeline(b *testing.B) { logrus.StandardLogger().SetLevel(logrus.ErrorLevel) t := &testing.T{} - test.InitConfig(t, strings.ReplaceAll(configTemplate, "type: file", "type: file_chunks")) + _, cfg := test.InitConfig(t, strings.ReplaceAll(configTemplate, "type: file", "type: file_chunks")) if t.Failed() { b.Fatalf("unexpected error loading config") } for n := 0; n < b.N; n++ { b.StopTimer() - p, err := NewPipeline() + p, err := NewPipeline(cfg) if err != nil { t.Fatalf("unexpected error %s", err) } diff --git a/pkg/pipeline/transform/transform_filter_test.go b/pkg/pipeline/transform/transform_filter_test.go index a963c09c8..c5453f1a6 100644 --- a/pkg/pipeline/transform/transform_filter_test.go +++ b/pkg/pipeline/transform/transform_filter_test.go @@ -114,10 +114,10 @@ func TestNewTransformFilterRemoveEntryIfDoesntExists(t *testing.T) { require.Equal(t, output, []config.GenericMap{}) } func InitNewTransformFilter(t *testing.T, configFile string) Transformer { - v := test.InitConfig(t, configFile) + v, cfg := test.InitConfig(t, configFile) require.NotNil(t, v) - config := config.Parameters[0] + config := cfg.Parameters[0] newTransform, err := NewTransformFilter(config) require.NoError(t, err) return newTransform diff --git a/pkg/pipeline/transform/transform_generic_test.go b/pkg/pipeline/transform/transform_generic_test.go index dd0652421..3e989cdaf 100644 --- a/pkg/pipeline/transform/transform_generic_test.go +++ b/pkg/pipeline/transform/transform_generic_test.go @@ -130,10 +130,10 @@ func TestNewTransformGenericMaintainTrue(t *testing.T) { } func InitNewTransformGeneric(t *testing.T, configFile string) Transformer { - v := test.InitConfig(t, configFile) + v, cfg := test.InitConfig(t, configFile) require.NotNil(t, v) - config := config.Parameters[0] + config := cfg.Parameters[0] newTransform, err := NewTransformGeneric(config) require.NoError(t, err) return newTransform diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 9144591a0..5ce80bfe5 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -202,9 +202,9 @@ parameters: } func InitNewTransformNetwork(t *testing.T, configFile string) Transformer { - v := test.InitConfig(t, configFile) + v, cfg := test.InitConfig(t, configFile) require.NotNil(t, v) - config := config.Parameters[0] + config := cfg.Parameters[0] newTransform, err := NewTransformNetwork(config) require.NoError(t, err) return newTransform diff --git a/pkg/pipeline/transform_multiple_test.go b/pkg/pipeline/transform_multiple_test.go index f7d7f377b..d7ce5d9c8 100644 --- a/pkg/pipeline/transform_multiple_test.go +++ b/pkg/pipeline/transform_multiple_test.go @@ -87,10 +87,10 @@ parameters: func TestTransformMultiple(t *testing.T) { var mainPipeline *Pipeline var err error - v := test.InitConfig(t, testConfigTransformMultiple) + v, cfg := test.InitConfig(t, testConfigTransformMultiple) require.NotNil(t, v) - mainPipeline, err = NewPipeline() + mainPipeline, err = NewPipeline(cfg) require.NoError(t, err) // The file ingester reads the entire file, pushes it down the pipeline, and then exits diff --git a/pkg/pipeline/transform_topk_test.go b/pkg/pipeline/transform_topk_test.go index 4bc20c7b9..2732c4c06 100644 --- a/pkg/pipeline/transform_topk_test.go +++ b/pkg/pipeline/transform_topk_test.go @@ -101,10 +101,10 @@ parameters: func TestAggregateTopk(t *testing.T) { var mainPipeline *Pipeline var err error - v := test.InitConfig(t, testConfigAggregateTopK) + v, cfg := test.InitConfig(t, testConfigAggregateTopK) require.NotNil(t, v) - mainPipeline, err = NewPipeline() + mainPipeline, err = NewPipeline(cfg) require.NoError(t, err) // The file ingester reads the entire file, pushes it down the pipeline, and then exits diff --git a/pkg/pipeline/utils/params_parse.go b/pkg/pipeline/utils/params_parse.go index c056384f4..0406db8ce 100644 --- a/pkg/pipeline/utils/params_parse.go +++ b/pkg/pipeline/utils/params_parse.go @@ -16,43 +16,3 @@ */ package utils - -import ( - "encoding/json" - - "github.com/netobserv/flowlogs-pipeline/pkg/config" - log "github.com/sirupsen/logrus" -) - -// ParamString returns its corresponding (json) string from config.parameters for specified params structure -func ParamString(params config.StageParam, stage string, stageType string) string { - log.Debugf("entering paramString") - log.Debugf("params = %v, stage = %s, stageType = %s", params, stage, stageType) - - var configMap []map[string]interface{} - var err error - err = json.Unmarshal([]byte(config.Opt.Parameters), &configMap) - if err != nil { - return "" - } - log.Debugf("configMap = %v", configMap) - - var returnBytes []byte - for index := range config.Parameters { - paramsEntry := &config.Parameters[index] - if params.Name == paramsEntry.Name { - log.Debugf("paramsEntry = %v", paramsEntry) - log.Debugf("data[index][stage] = %v", configMap[index][stage]) - // convert back to string - subField := configMap[index][stage].(map[string]interface{}) - log.Debugf("subField = %v", subField) - returnBytes, err = json.Marshal(subField[stageType]) - if err != nil { - return "" - } - break - } - } - log.Debugf("returnBytes = %s", string(returnBytes)) - return string(returnBytes) -} diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go index 60a4d98b5..d0f06e693 100644 --- a/pkg/pipeline/write/write_loki.go +++ b/pkg/pipeline/write/write_loki.go @@ -18,7 +18,6 @@ package write import ( - "encoding/json" "fmt" "math" "strings" @@ -245,34 +244,31 @@ func (l *Loki) processRecords() { // NewWriteLoki creates a Loki writer from configuration func NewWriteLoki(params config.StageParam) (*Loki, error) { log.Debugf("entering NewWriteLoki") - - writeLokiString := pUtils.ParamString(params, "write", "loki") - log.Debugf("writeLokiString = %s", writeLokiString) - var jsonWriteLoki = api.GetWriteLokiDefaults() - err := json.Unmarshal([]byte(writeLokiString), &jsonWriteLoki) - if err != nil { - return nil, err + lokiConfigIn := api.WriteLoki{} + if params.Write != nil && params.Write.Loki != nil { + lokiConfigIn = *params.Write.Loki } - // need to combine defaults with parameters that are provided in the config yaml file - if err = jsonWriteLoki.Validate(); err != nil { + lokiConfigIn.SetDefaults() + + if err := lokiConfigIn.Validate(); err != nil { return nil, fmt.Errorf("the provided config is not valid: %w", err) } - lokiConfig, buildconfigErr := buildLokiConfig(&jsonWriteLoki) + lokiConfig, buildconfigErr := buildLokiConfig(&lokiConfigIn) if buildconfigErr != nil { - return nil, err + return nil, buildconfigErr } - client, NewWithLoggerErr := loki.NewWithLogger(lokiConfig, logAdapter.NewLogger(log.WithField("module", "export/loki"))) - if NewWithLoggerErr != nil { - return nil, err + client, newWithLoggerErr := loki.NewWithLogger(lokiConfig, logAdapter.NewLogger(log.WithField("module", "export/loki"))) + if newWithLoggerErr != nil { + return nil, newWithLoggerErr } in := make(chan config.GenericMap, channelSize) l := &Loki{ lokiConfig: lokiConfig, - apiConfig: jsonWriteLoki, + apiConfig: lokiConfigIn, client: client, timeNow: time.Now, exitChan: pUtils.ExitChannel(), diff --git a/pkg/pipeline/write/write_loki_test.go b/pkg/pipeline/write/write_loki_test.go index 696d8a358..73933e312 100644 --- a/pkg/pipeline/write/write_loki_test.go +++ b/pkg/pipeline/write/write_loki_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" @@ -64,19 +63,21 @@ parameters: baz: bae tiki: taka ` - v := test.InitConfig(t, yamlConfig) + v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - loki, err := NewWriteLoki(config.Parameters[0]) + loki, err := NewWriteLoki(cfg.Parameters[0]) require.NoError(t, err) assert.Equal(t, "https://foo:8888/loki/api/v1/push", loki.lokiConfig.URL.String()) assert.Equal(t, "theTenant", loki.lokiConfig.TenantID) assert.Equal(t, time.Minute, loki.lokiConfig.BatchWait) - assert.NotZero(t, loki.lokiConfig.BatchSize) - assert.Equal(t, loki.apiConfig.BatchSize, loki.lokiConfig.BatchSize) minBackoff, _ := time.ParseDuration(loki.apiConfig.MinBackoff) assert.Equal(t, minBackoff, loki.lokiConfig.BackoffConfig.MinBackoff) + + // Make sure default batch size is set + assert.Equal(t, 102400, loki.lokiConfig.BatchSize) + assert.Equal(t, loki.apiConfig.BatchSize, loki.lokiConfig.BatchSize) } func TestLoki_ProcessRecord(t *testing.T) { @@ -89,6 +90,7 @@ parameters: write: type: loki loki: + url: http://loki:3100/ timestampLabel: ts ignoreList: - ignored @@ -98,10 +100,10 @@ parameters: - foo - bar ` - v := test.InitConfig(t, yamlConfig) + v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - loki, err := NewWriteLoki(config.Parameters[0]) + loki, err := NewWriteLoki(cfg.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} @@ -149,12 +151,13 @@ parameters: write: type: loki loki: + url: http://loki:3100/ timestampScale: %s `, testCase.unit) - v := test.InitConfig(t, yamlConf) + v, cfg := test.InitConfig(t, yamlConf) require.NotNil(t, v) - loki, err := NewWriteLoki(config.Parameters[0]) + loki, err := NewWriteLoki(cfg.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} @@ -176,7 +179,7 @@ parameters: - name: write1 write: type: loki - loki: + loki: { url: http://loki:3100/ } ` // Tests those cases where the timestamp can't be extracted and reports the current time @@ -192,10 +195,10 @@ func TestTimestampExtraction_LocalTime(t *testing.T) { {name: "zero ts value", tsLabel: "ts", input: map[string]interface{}{"ts": 0}}, } { t.Run(testCase.name, func(t *testing.T) { - v := test.InitConfig(t, yamlConfigNoParams) + v, cfg := test.InitConfig(t, yamlConfigNoParams) require.NotNil(t, v) - loki, err := NewWriteLoki(config.Parameters[0]) + loki, err := NewWriteLoki(cfg.Parameters[0]) require.NoError(t, err) loki.apiConfig.TimestampLabel = testCase.tsLabel @@ -227,16 +230,17 @@ parameters: write: type: loki loki: + url: http://loki:3100/ labels: - "fo.o" - "ba-r" - "ba/z" - "ignored?" ` - v := test.InitConfig(t, yamlConfig) + v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - loki, err := NewWriteLoki(config.Parameters[0]) + loki, err := NewWriteLoki(cfg.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} @@ -268,8 +272,8 @@ parameters: loki: url: %s `, fakeLoki.URL) - test.InitConfig(t, yamlConfig) - loki, err := NewWriteLoki(config.Parameters[0]) + _, cfg := test.InitConfig(t, yamlConfig) + loki, err := NewWriteLoki(cfg.Parameters[0]) require.NoError(t, err) require.NoError(t, loki.ProcessRecord(map[string]interface{}{"foo": "bar", "baz": "bae"})) diff --git a/pkg/test/utils.go b/pkg/test/utils.go index bf1680a75..3ce64c242 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -25,7 +25,6 @@ import ( "io/ioutil" "os" "os/exec" - "reflect" "strings" "testing" "time" @@ -58,7 +57,7 @@ func GetIngestMockEntry(missingKey bool) config.GenericMap { return entry } -func InitConfig(t *testing.T, conf string) *viper.Viper { +func InitConfig(t *testing.T, conf string) (*viper.Viper, *config.ConfigFileStruct) { var json = jsoniter.ConfigCompatibleWithStandardLibrary yamlConfig := []byte(conf) v := viper.New() @@ -67,46 +66,29 @@ func InitConfig(t *testing.T, conf string) *viper.Viper { err := v.ReadConfig(r) require.NoError(t, err) - // set up global config info - // first clear out the config structures in case they were set by a previous instantiation - p1 := reflect.ValueOf(&config.PipeLine).Elem() - p1.Set(reflect.Zero(p1.Type())) - p2 := reflect.ValueOf(&config.Parameters).Elem() - p2.Set(reflect.Zero(p2.Type())) - var b []byte pipelineStr := v.Get("pipeline") b, err = json.Marshal(&pipelineStr) if err != nil { fmt.Printf("error marshaling: %v\n", err) - return nil + return nil, nil } - config.Opt.PipeLine = string(b) + opts := config.Options{} + opts.PipeLine = string(b) parametersStr := v.Get("parameters") b, err = json.Marshal(¶metersStr) if err != nil { fmt.Printf("error marshaling: %v\n", err) - return nil - } - config.Opt.Parameters = string(b) - err = json.Unmarshal([]byte(config.Opt.PipeLine), &config.PipeLine) - if err != nil { - fmt.Printf("error unmarshaling: %v\n", err) - return nil + return nil, nil } - err = json.Unmarshal([]byte(config.Opt.Parameters), &config.Parameters) - if err != nil { - fmt.Printf("error unmarshaling: %v\n", err) - return nil - } - - err = config.ParseConfig() + opts.Parameters = string(b) + out, err := config.ParseConfig(opts) if err != nil { fmt.Printf("error in parsing config file: %v \n", err) - return nil + return nil, nil } - return v + return v, &out } func GetExtractMockEntry() config.GenericMap { diff --git a/pkg/test/utils_test.go b/pkg/test/utils_test.go index f317b6a72..0e825827b 100644 --- a/pkg/test/utils_test.go +++ b/pkg/test/utils_test.go @@ -35,8 +35,9 @@ func Test_GetIngestMockEntry(t *testing.T) { } func Test_InitConfig(t *testing.T) { - viper := InitConfig(t, "") + viper, out := InitConfig(t, "") require.NotNil(t, viper) + require.NotNil(t, out) } func Test_GetExtractMockEntry(t *testing.T) {