diff --git a/pkg/confgen/confgen.go b/pkg/confgen/confgen.go index 9fc608dab..8e478a6b2 100644 --- a/pkg/confgen/confgen.go +++ b/pkg/confgen/confgen.go @@ -78,19 +78,24 @@ func (cg *ConfGen) Run() error { return err } - definitionFiles := cg.GetDefinitionFiles(cg.opts.SrcFolder) + definitionFiles := getDefinitionFiles(cg.opts.SrcFolder) for _, definitionFile := range definitionFiles { - err := cg.parseFile(definitionFile) + b, err := ioutil.ReadFile(definitionFile) if err != nil { - log.Debugf("cg.parseFile err: %v ", err) + log.Debugf("ioutil.ReadFile err: %v ", err) + continue + } + err = cg.ParseDefinition(definitionFile, b) + if err != nil { + log.Debugf("cg.parseDefinition err: %v ", err) continue } } - cg.Dedupe() + cg.dedupe() if len(cg.opts.GenerateStages) != 0 { - config := cg.GenerateTruncatedConfig(cg.opts.GenerateStages) + config := cg.GenerateTruncatedConfig() err = cg.writeConfigFile(cg.opts.DestConfFile, config) if err != nil { log.Debugf("cg.GenerateTruncatedConfig err: %v ", err) @@ -121,47 +126,28 @@ func (cg *ConfGen) Run() error { return nil } -func (cg *ConfGen) checkHeader(fileName string) error { - // check header - f, err := os.OpenFile(fileName, os.O_RDONLY, 0644) - if err != nil { - log.Debugf("os.OpenFile error: %v ", err) - return err - } +func checkHeader(bytes []byte) error { header := make([]byte, len(definitionHeader)) - _, err = f.Read(header) - if err != nil || string(header) != definitionHeader { - log.Debugf("Wrong header file: %s ", fileName) + copy(header, bytes) + if string(header) != definitionHeader { return fmt.Errorf("wrong header") } - err = f.Close() - if err != nil { - log.Debugf("f.Close err: %v ", err) - return err - } - return nil } -func (cg *ConfGen) parseFile(fileName string) error { - +func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error { // check header - err := cg.checkHeader(fileName) + err := checkHeader(bytes) if err != nil { - log.Debugf("cg.checkHeader err: %v ", err) + log.Debugf("%s cg.checkHeader err: %v ", name, err) return err } // parse yaml var defFile DefFile - yamlFile, err := ioutil.ReadFile(fileName) - if err != nil { - log.Debugf("ioutil.ReadFile err: %v ", err) - return err - } - err = yaml.Unmarshal(yamlFile, &defFile) + err = yaml.Unmarshal(bytes, &defFile) if err != nil { - log.Debugf("yaml.Unmarshal err: %v ", err) + log.Debugf("%s yaml.Unmarshal err: %v ", name, err) return err } @@ -169,14 +155,15 @@ func (cg *ConfGen) parseFile(fileName string) error { for _, skipTag := range cg.opts.SkipWithTags { for _, tag := range defFile.Tags { if skipTag == tag { - return fmt.Errorf("skipping definition %s due to skip tag %s", fileName, tag) + log.Infof("skipping definition %s due to skip tag %s", name, tag) + return nil } } } // parse definition definition := Definition{ - FileName: fileName, + FileName: name, Description: defFile.Description, Details: defFile.Details, Usage: defFile.Usage, @@ -216,7 +203,7 @@ func (cg *ConfGen) parseFile(fileName string) error { return nil } -func (*ConfGen) GetDefinitionFiles(rootPath string) []string { +func getDefinitionFiles(rootPath string) []string { var files []string diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index 2ceef1f88..518e9e026 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -26,30 +26,22 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" ) func Test_checkHeader(t *testing.T) { - filename := "/tmp/header.check.txt" - fakeFilename := "/tmp/fake_file.does.exist" wrongHeader := "#wrong_confgen" - cg := NewConfGen(&Options{}) - err := cg.checkHeader(fakeFilename) - require.Error(t, err) - err = os.WriteFile(filename, []byte(wrongHeader), 0644) - require.NoError(t, err) - err = cg.checkHeader(filename) + err := checkHeader([]byte(wrongHeader)) require.Error(t, err) - err = os.WriteFile(filename, []byte(definitionHeader), 0644) - require.NoError(t, err) - err = cg.checkHeader(filename) + err = checkHeader([]byte(definitionHeader)) require.NoError(t, err) } -const generalConfig = `#flp_confgen +const shortConfig = `#flp_confgen description: test description ingest: @@ -73,6 +65,41 @@ visualization: schemaVersion: "16" ` +const longConfig = `#flp_confgen +description: + test description +ingest: + collector: + port: 2155 + portLegacy: 2156 + hostName: 0.0.0.0 +transform: + generic: + rules: + - input: SrcAddr + output: srcIP +encode: + type: prom + prom: + prefix: flp_ + port: 9102 +write: + type: loki + loki: + url: http://loki:3100 + staticLabels: + job: flowlogs-pipeline +visualization: + type: grafana + grafana: + dashboards: + - name: "details" + title: "Flow-Logs to Metrics - Details" + time_from: "now-15m" + tags: "['flp','grafana','dashboard','details']" + schemaVersion: "16" +` + const networkDefs = `#flp_confgen description: test description @@ -116,34 +143,110 @@ visualization: Test grafana title ` -func Test_parseFile(t *testing.T) { - fakeFilename := "/tmp/fake_file.does.exist" - filename := "/tmp/parse_file.check.txt" +func Test_ParseDefinition(t *testing.T) { cg := NewConfGen(&Options{}) - err := cg.parseFile(fakeFilename) - require.Error(t, err) - - err = os.WriteFile(filename, []byte(networkDefs), 0644) - require.NoError(t, err) - err = cg.parseFile(filename) + err := cg.ParseDefinition("def", []byte(networkDefs)) require.NoError(t, err) } func Test_getDefinitionFiles(t *testing.T) { dirPath := "/tmp/getDefinitionFilesTest" filename := "/def.yaml" - cg := NewConfGen(&Options{}) err := os.MkdirAll(dirPath, 0755) require.NoError(t, err) err = os.WriteFile(filepath.Join(dirPath, filename), []byte(networkDefs), 0644) require.NoError(t, err) - files := cg.GetDefinitionFiles(dirPath) + files := getDefinitionFiles(dirPath) require.Equal(t, 1, len(files)) expected := []string{path.Join(dirPath, filename)} require.ElementsMatch(t, expected, files) } -func Test_RunConfGen(t *testing.T) { +func Test_RunShortConfGen(t *testing.T) { + // Prepare + dirPath := "/tmp/getDefinitionFilesTest" + cg := NewConfGen(&Options{ + SrcFolder: dirPath, + DestConfFile: "/tmp/destConfigTest", + DestDocFile: "/tmp/destDocTest", + DestGrafanaJsonnetFolder: "/tmp/destJsonnetTest", + }) + err := os.MkdirAll(dirPath, 0755) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(shortConfig), 0644) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(dirPath, "def.yaml"), []byte(networkDefs), 0644) + require.NoError(t, err) + + // Run + err = cg.Run() + require.NoError(t, err) + + // Unmarshal output + type Output struct { + Pipeline []config.Stage `yaml:"pipeline"` + Parameters []config.StageParam `yaml:"parameters"` + } + destCfgBytes, err := ioutil.ReadFile("/tmp/destConfigTest") + require.NoError(t, err) + var out Output + err = yaml.Unmarshal(destCfgBytes, &out) + require.NoError(t, err) + require.Len(t, out.Pipeline, 4) + require.Len(t, out.Parameters, 4) + + // Pipeline structure + require.Equal(t, + []config.Stage( + []config.Stage{{Name: "ingest_collector"}, + {Name: "transform_network", Follows: "ingest_collector"}, + {Name: "extract_aggregate", Follows: "transform_network"}, + {Name: "encode_prom", Follows: "extract_aggregate"}}), + out.Pipeline, + ) + + // Expects ingest + require.Equal(t, &api.IngestCollector{ + HostName: "0.0.0.0", + Port: 2155, + PortLegacy: 2156, + }, out.Parameters[0].Ingest.Collector) + + // Expects transform network + require.Len(t, out.Parameters[1].Transform.Network.Rules, 1) + require.Equal(t, api.NetworkTransformRule{ + Input: "testInput", + Output: "testOutput", + Type: "add_service", + Parameters: "proto", + }, out.Parameters[1].Transform.Network.Rules[0]) + + // Expects aggregates + require.Len(t, out.Parameters[2].Extract.Aggregates, 1) + require.Equal(t, api.AggregateDefinition{ + Name: "test_aggregates", + By: api.AggregateBy{"service"}, + Operation: "sum", + RecordKey: "test_record_key", + }, out.Parameters[2].Extract.Aggregates[0]) + + // Expects prom encode + require.Len(t, out.Parameters[3].Encode.Prom.Metrics, 1) + require.Equal(t, &api.PromEncode{ + Port: 9102, + Prefix: "flp_", + Metrics: api.PromMetricsItems{{ + Name: "test_metric", + Type: "gauge", + Filter: api.PromMetricsFilter{Key: "", Value: ""}, + ValueKey: "test_aggregates_value", + Labels: []string{"by", "aggregate"}, + Buckets: []float64{}, + }}, + }, out.Parameters[3].Encode.Prom) +} + +func Test_RunLongConfGen(t *testing.T) { // Prepare dirPath := "/tmp/getDefinitionFilesTest" cg := NewConfGen(&Options{ @@ -154,7 +257,7 @@ func Test_RunConfGen(t *testing.T) { }) err := os.MkdirAll(dirPath, 0755) require.NoError(t, err) - err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(generalConfig), 0644) + err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(longConfig), 0644) require.NoError(t, err) err = os.WriteFile(filepath.Join(dirPath, "def.yaml"), []byte(networkDefs), 0644) require.NoError(t, err) @@ -173,6 +276,20 @@ func Test_RunConfGen(t *testing.T) { var out Output err = yaml.Unmarshal(destCfgBytes, &out) require.NoError(t, err) + require.Len(t, out.Parameters, 6) + require.Len(t, out.Pipeline, 6) + + // Pipeline structure + require.Equal(t, + []config.Stage( + []config.Stage{{Name: "ingest_collector"}, + {Name: "transform_generic", Follows: "ingest_collector"}, + {Name: "transform_network", Follows: "transform_generic"}, + {Name: "extract_aggregate", Follows: "transform_network"}, + {Name: "encode_prom", Follows: "extract_aggregate"}, + {Name: "write_loki", Follows: "transform_network"}}), + out.Pipeline, + ) // Expects ingest require.Equal(t, &api.IngestCollector{ @@ -216,4 +333,44 @@ func Test_RunConfGen(t *testing.T) { Buckets: []float64{}, }}, }, out.Parameters[4].Encode.Prom) + + // Expects loki + require.Equal(t, &api.WriteLoki{ + URL: "http://loki:3100", + StaticLabels: model.LabelSet{"job": "flowlogs-pipeline"}, + }, out.Parameters[5].Write.Loki) +} + +func Test_GenerateTruncatedConfig(t *testing.T) { + // Prepare + cg := NewConfGen(&Options{ + GenerateStages: []string{"extract_aggregate", "encode_prom"}, + }) + err := cg.ParseDefinition("defs", []byte(networkDefs)) + require.NoError(t, err) + + // Run + params := cg.GenerateTruncatedConfig() + + require.Len(t, params, 2) + // Expects aggregates + require.Len(t, params[0].Extract.Aggregates, 1) + require.Equal(t, api.AggregateDefinition{ + Name: "test_aggregates", + By: api.AggregateBy{"service"}, + Operation: "sum", + RecordKey: "test_record_key", + }, params[0].Extract.Aggregates[0]) + + // Expects prom encode + require.Len(t, params[1].Encode.Prom.Metrics, 1) + require.Equal(t, &api.PromEncode{ + Metrics: api.PromMetricsItems{{ + Name: "test_metric", + Type: "gauge", + Filter: api.PromMetricsFilter{Key: "", Value: ""}, + ValueKey: "test_aggregates_value", + Labels: []string{"by", "aggregate"}, + }}, + }, params[1].Encode.Prom) } diff --git a/pkg/confgen/config.go b/pkg/confgen/config.go index a894f3e2f..5d9d012c4 100644 --- a/pkg/confgen/config.go +++ b/pkg/confgen/config.go @@ -21,7 +21,7 @@ import ( "io/ioutil" "os" - "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" @@ -36,33 +36,16 @@ type Options struct { GenerateStages []string } -type ConfigIngest struct { - Collector api.IngestCollector `yaml:"collector"` -} - -type ConfigTransform struct { - Generic api.TransformGeneric `yaml:"generic"` -} - -type ConfigEncode struct { - Prom api.PromEncode `yaml:"prom"` -} - -type ConfigWrite struct { - Loki api.WriteLoki `yaml:"loki"` - Type string `yaml:"type"` -} - type ConfigVisualization struct { Grafana ConfigVisualizationGrafana `yaml:"grafana"` } type Config struct { Description string `yaml:"description"` - Ingest ConfigIngest `yaml:"ingest"` - Transform ConfigTransform `yaml:"transform"` - Write ConfigWrite `yaml:"write"` - Encode ConfigEncode `yaml:"encode"` + Ingest config.Ingest `yaml:"ingest"` + Transform config.Transform `yaml:"transform"` + Write config.Write `yaml:"write"` + Encode config.Encode `yaml:"encode"` Visualization ConfigVisualization `yaml:"visualization"` } diff --git a/pkg/confgen/config_test.go b/pkg/confgen/config_test.go index a0792ae52..e40118f6f 100644 --- a/pkg/confgen/config_test.go +++ b/pkg/confgen/config_test.go @@ -22,20 +22,21 @@ import ( "testing" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/stretchr/testify/require" ) func expectedConfig() *Config { return &Config{ Description: "test description", - Encode: ConfigEncode{ - Prom: api.PromEncode{ + Encode: config.Encode{ + Prom: &api.PromEncode{ Port: 7777, Prefix: "prefix", }, }, - Ingest: ConfigIngest{ - Collector: api.IngestCollector{ + Ingest: config.Ingest{ + Collector: &api.IngestCollector{ Port: 8888, }, }, diff --git a/pkg/confgen/dedup.go b/pkg/confgen/dedup.go index 0255468eb..e09d19953 100644 --- a/pkg/confgen/dedup.go +++ b/pkg/confgen/dedup.go @@ -25,7 +25,7 @@ import ( log "github.com/sirupsen/logrus" ) -func (cg *ConfGen) Dedupe() { +func (cg *ConfGen) dedupe() { cg.transformRules = dedupeNetworkTransformRules(cg.transformRules) cg.aggregateDefinitions = dedupeAggregateDefinitions(cg.aggregateDefinitions) } diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 4f9fe3f1a..9243f034f 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -22,163 +22,70 @@ import ( "io/ioutil" "github.com/netobserv/flowlogs-pipeline/pkg/api" - config "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) -func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config.ConfigFileStruct { - configStruct := config.ConfigFileStruct{ - LogLevel: "error", - Pipeline: []config.Stage{ - {Name: "ingest_collector"}, - {Name: "transform_generic", - Follows: "ingest_collector", - }, - {Name: "transform_network", - Follows: "transform_generic", - }, - {Name: "extract_aggregate", - Follows: "transform_network", - }, - {Name: "encode_prom", - Follows: "extract_aggregate", - }, - {Name: "write_loki", - Follows: "transform_network", - }, - }, - Parameters: []config.StageParam{ - {Name: "ingest_collector", - Ingest: &config.Ingest{ - Type: "collector", - Collector: &api.IngestCollector{ - Port: cg.config.Ingest.Collector.Port, - PortLegacy: cg.config.Ingest.Collector.PortLegacy, - HostName: cg.config.Ingest.Collector.HostName, - }, - }, - }, - {Name: "transform_generic", - Transform: &config.Transform{ - Type: "generic", - Generic: &api.TransformGeneric{ - Policy: "replace_keys", - Rules: cg.config.Transform.Generic.Rules, - }, - }, - }, - {Name: "transform_network", - Transform: &config.Transform{ - Type: "network", - Network: &api.TransformNetwork{ - Rules: cg.transformRules, - }, - }, - }, - {Name: "extract_aggregate", - Extract: &config.Extract{ - Type: "aggregates", - Aggregates: cg.aggregateDefinitions, - }, - }, - {Name: "encode_prom", - Encode: &config.Encode{ - Type: "prom", - Prom: &api.PromEncode{ - Port: cg.config.Encode.Prom.Port, - Prefix: cg.config.Encode.Prom.Prefix, - Metrics: cg.promMetrics, - }, - }, - }, - {Name: "write_loki", - Write: &config.Write{ - Type: cg.config.Write.Type, - Loki: &cg.config.Write.Loki, - }, - }, - }, +func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() map[string]interface{} { + pipeline, _ := config.NewPipeline("ingest_collector", &cg.config.Ingest) + next := pipeline + if cg.config.Transform.Generic != nil { + gen := *cg.config.Transform.Generic + if len(gen.Policy) == 0 { + gen.Policy = "replace_keys" + } + next = next.TransformGeneric("transform_generic", gen) + } + if len(cg.transformRules) > 0 { + next = next.TransformNetwork("transform_network", api.TransformNetwork{ + Rules: cg.transformRules, + }) + } + if len(cg.aggregateDefinitions) > 0 { + agg := next.Aggregate("extract_aggregate", cg.aggregateDefinitions) + agg.EncodePrometheus("encode_prom", api.PromEncode{ + Port: cg.config.Encode.Prom.Port, + Prefix: cg.config.Encode.Prom.Prefix, + Metrics: cg.promMetrics, + }) + } + if cg.config.Write.Loki != nil { + next.WriteLoki("write_loki", *cg.config.Write.Loki) + } + return map[string]interface{}{ + "log-level": "error", + "pipeline": pipeline.GetStages(), + "parameters": pipeline.GetStageParams(), } - return configStruct } -func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config.ConfigFileStruct { - parameters := make([]config.StageParam, len(stages)) - for i, stage := range stages { +func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam { + parameters := make([]config.StageParam, len(cg.opts.GenerateStages)) + for i, stage := range cg.opts.GenerateStages { switch stage { case "ingest": - parameters[i] = config.StageParam{ - Name: "ingest_collector", - Ingest: &config.Ingest{ - Type: "collector", - Collector: &api.IngestCollector{ - Port: cg.config.Ingest.Collector.Port, - PortLegacy: cg.config.Ingest.Collector.PortLegacy, - HostName: cg.config.Ingest.Collector.HostName, - }, - }, - } + parameters[i] = config.NewCollectorParams("ingest_collector", *cg.config.Ingest.Collector) case "transform_generic": - parameters[i] = config.StageParam{ - Name: "transform_generic", - Transform: &config.Transform{ - Type: "generic", - Generic: &api.TransformGeneric{ - Policy: "replace_keys", - Rules: cg.config.Transform.Generic.Rules, - }, - }, - } + parameters[i] = config.NewTransformGenericParams("transform_generic", *cg.config.Transform.Generic) case "transform_network": - parameters[i] = config.StageParam{ - Name: "transform_network", - Transform: &config.Transform{ - Type: "network", - Network: &api.TransformNetwork{ - Rules: cg.transformRules, - }, - }, - } + parameters[i] = config.NewTransformNetworkParams("transform_network", *cg.config.Transform.Network) case "extract_aggregate": - parameters[i] = config.StageParam{ - Name: "extract_aggregate", - Extract: &config.Extract{ - Type: "aggregates", - Aggregates: cg.aggregateDefinitions, - }, - } + parameters[i] = config.NewAggregateParams("extract_aggregate", cg.aggregateDefinitions) case "encode_prom": - parameters[i] = config.StageParam{ - Name: "encode_prom", - Encode: &config.Encode{ - Type: "prom", - Prom: &api.PromEncode{ - Port: cg.config.Encode.Prom.Port, - Prefix: cg.config.Encode.Prom.Prefix, - Metrics: cg.promMetrics, - }, - }, - } + parameters[i] = config.NewEncodePrometheusParams("encode_prom", api.PromEncode{ + Metrics: cg.promMetrics, + }) case "write_loki": - parameters[i] = config.StageParam{ - Name: "write_loki", - Write: &config.Write{ - Type: cg.config.Write.Type, - Loki: &cg.config.Write.Loki, - }, - } + parameters[i] = config.NewWriteLokiParams("write_loki", *cg.config.Write.Loki) } } log.Debugf("parameters = %v \n", parameters) - configStruct := config.ConfigFileStruct{ - Parameters: parameters, - } - return configStruct + return parameters } -func (cg *ConfGen) writeConfigFile(fileName string, config config.ConfigFileStruct) error { - configData, err := yaml.Marshal(&config) +func (cg *ConfGen) writeConfigFile(fileName string, config interface{}) error { + configData, err := yaml.Marshal(config) if err != nil { return err } diff --git a/pkg/config/config.go b/pkg/config/config.go index 3c26d9f12..356ecb422 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -40,12 +40,6 @@ type Health struct { Port string } -type ConfigFileStruct struct { - LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"` - Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"` - Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` -} - type Stage struct { Name string `yaml:"name" json:"name"` Follows string `yaml:"follows,omitempty" json:"follows,omitempty"` diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index 35430cd44..49f20398a 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -18,6 +18,8 @@ package config import ( + "errors" + "github.com/netobserv/flowlogs-pipeline/pkg/api" ) @@ -45,11 +47,25 @@ type PipelineBuilderStage struct { pipeline *pipeline } +// NewPipeline creates a new pipeline from an existing ingest +func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) { + if ingest.Collector != nil { + return NewCollectorPipeline(name, *ingest.Collector), nil + } + if ingest.GRPC != nil { + return NewGRPCPipeline(name, *ingest.GRPC), nil + } + if ingest.Kafka != nil { + return NewKafkaPipeline(name, *ingest.Kafka), nil + } + return PipelineBuilderStage{}, errors.New("Missing ingest params") +} + // NewCollectorPipeline creates a new pipeline from an `IngestCollector` initial stage (listening for NetFlows / IPFIX) func NewCollectorPipeline(name string, ingest api.IngestCollector) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.CollectorType, Collector: &ingest}}}, + config: []StageParam{NewCollectorParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -58,7 +74,7 @@ func NewCollectorPipeline(name string, ingest api.IngestCollector) PipelineBuild func NewGRPCPipeline(name string, ingest api.IngestGRPCProto) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.GRPCType, GRPC: &ingest}}}, + config: []StageParam{NewGRPCParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -67,7 +83,7 @@ func NewGRPCPipeline(name string, ingest api.IngestGRPCProto) PipelineBuilderSta func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.KafkaType, Kafka: &ingest}}}, + config: []StageParam{NewKafkaParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -80,22 +96,22 @@ func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuild // Aggregate chains the current stage with an aggregate stage and returns that new stage func (b *PipelineBuilderStage) Aggregate(name string, aggs []api.AggregateDefinition) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: aggs}}) + return b.next(name, NewAggregateParams(name, aggs)) } // TransformGeneric chains the current stage with a TransformGeneric stage and returns that new stage func (b *PipelineBuilderStage) TransformGeneric(name string, gen api.TransformGeneric) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.GenericType, Generic: &gen}}) + return b.next(name, NewTransformGenericParams(name, gen)) } // TransformFilter chains the current stage with a TransformFilter stage and returns that new stage func (b *PipelineBuilderStage) TransformFilter(name string, filter api.TransformFilter) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.FilterType, Filter: &filter}}) + return b.next(name, NewTransformFilterParams(name, filter)) } // TransformNetwork chains the current stage with a TransformNetwork stage and returns that new stage func (b *PipelineBuilderStage) TransformNetwork(name string, nw api.TransformNetwork) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}}) + return b.next(name, NewTransformNetworkParams(name, nw)) } // ConnTrack chains the current stage with a ConnTrack stage and returns that new stage @@ -105,22 +121,22 @@ func (b *PipelineBuilderStage) ConnTrack(name string, ct api.ConnTrack) Pipeline // EncodePrometheus chains the current stage with a PromEncode stage (to expose metrics in Prometheus format) and returns that new stage func (b *PipelineBuilderStage) EncodePrometheus(name string, prom api.PromEncode) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Encode: &Encode{Type: api.PromType, Prom: &prom}}) + return b.next(name, NewEncodePrometheusParams(name, prom)) } // EncodeKafka chains the current stage with an EncodeKafka stage (writing to a Kafka topic) and returns that new stage func (b *PipelineBuilderStage) EncodeKafka(name string, kafka api.EncodeKafka) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Encode: &Encode{Type: api.KafkaType, Kafka: &kafka}}) + return b.next(name, NewEncodeKafkaParams(name, kafka)) } // WriteStdout chains the current stage with a WriteStdout stage and returns that new stage func (b *PipelineBuilderStage) WriteStdout(name string, stdout api.WriteStdout) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Write: &Write{Type: api.StdoutType, Stdout: &stdout}}) + return b.next(name, NewWriteStdoutParams(name, stdout)) } // WriteLoki chains the current stage with a WriteLoki stage and returns that new stage func (b *PipelineBuilderStage) WriteLoki(name string, loki api.WriteLoki) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Write: &Write{Type: api.LokiType, Loki: &loki}}) + return b.next(name, NewWriteLokiParams(name, loki)) } // GetStages returns the current pipeline stages. It can be called from any of the stages, they share the same pipeline reference. diff --git a/pkg/config/stage_params.go b/pkg/config/stage_params.go new file mode 100644 index 000000000..fc7641eda --- /dev/null +++ b/pkg/config/stage_params.go @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2021 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/api" +) + +func NewCollectorParams(name string, ingest api.IngestCollector) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.CollectorType, Collector: &ingest}} +} + +func NewGRPCParams(name string, ingest api.IngestGRPCProto) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.GRPCType, GRPC: &ingest}} +} + +func NewKafkaParams(name string, ingest api.IngestKafka) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.KafkaType, Kafka: &ingest}} +} + +func NewAggregateParams(name string, aggs []api.AggregateDefinition) StageParam { + return StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: aggs}} +} + +func NewTransformGenericParams(name string, gen api.TransformGeneric) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.GenericType, Generic: &gen}} +} + +func NewTransformFilterParams(name string, filter api.TransformFilter) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.FilterType, Filter: &filter}} +} + +func NewTransformNetworkParams(name string, nw api.TransformNetwork) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}} +} + +func NewEncodePrometheusParams(name string, prom api.PromEncode) StageParam { + return StageParam{Name: name, Encode: &Encode{Type: api.PromType, Prom: &prom}} +} + +func NewEncodeKafkaParams(name string, kafka api.EncodeKafka) StageParam { + return StageParam{Name: name, Encode: &Encode{Type: api.KafkaType, Kafka: &kafka}} +} + +func NewWriteStdoutParams(name string, stdout api.WriteStdout) StageParam { + return StageParam{Name: name, Write: &Write{Type: api.StdoutType, Stdout: &stdout}} +} + +func NewWriteLokiParams(name string, loki api.WriteLoki) StageParam { + return StageParam{Name: name, Write: &Write{Type: api.LokiType, Loki: &loki}} +}