Skip to content

Commit

Permalink
Use main model in confgen, use pipeline builder
Browse files Browse the repository at this point in the history
Fixes netobserv#254

- Removed duplicated confgen model
- Use PipelineBuilder to simplify pipeline generation
- Make confgen easier to consume as a lib (e.g. for NOO)
  - Do not make it necessary to call "Run": parsing definition files
    should be sufficient
  - Do not make it necessary to work with files written on disk: work
    with []byte instead
- SkipWithTags should not result in an error when a file is skipped
- Add confgen tests
  • Loading branch information
jotak committed Jul 26, 2022
1 parent f9f282a commit 2f037bd
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 242 deletions.
57 changes: 22 additions & 35 deletions pkg/confgen/confgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,24 @@ func (cg *ConfGen) Run() error {
return err
}

definitionFiles := cg.GetDefinitionFiles(cg.opts.SrcFolder)
definitionFiles := getDefinitionFiles(cg.opts.SrcFolder)
for _, definitionFile := range definitionFiles {
err := cg.parseFile(definitionFile)
b, err := ioutil.ReadFile(definitionFile)
if err != nil {
log.Debugf("cg.parseFile err: %v ", err)
log.Debugf("ioutil.ReadFile err: %v ", err)
continue
}
err = cg.ParseDefinition(definitionFile, b)
if err != nil {
log.Debugf("cg.parseDefinition err: %v ", err)
continue
}
}

cg.Dedupe()
cg.dedupe()

if len(cg.opts.GenerateStages) != 0 {
config := cg.GenerateTruncatedConfig(cg.opts.GenerateStages)
config := cg.GenerateTruncatedConfig()
err = cg.writeConfigFile(cg.opts.DestConfFile, config)
if err != nil {
log.Debugf("cg.GenerateTruncatedConfig err: %v ", err)
Expand Down Expand Up @@ -121,62 +126,44 @@ func (cg *ConfGen) Run() error {
return nil
}

func (cg *ConfGen) checkHeader(fileName string) error {
// check header
f, err := os.OpenFile(fileName, os.O_RDONLY, 0644)
if err != nil {
log.Debugf("os.OpenFile error: %v ", err)
return err
}
func checkHeader(bytes []byte) error {
header := make([]byte, len(definitionHeader))
_, err = f.Read(header)
if err != nil || string(header) != definitionHeader {
log.Debugf("Wrong header file: %s ", fileName)
copy(header, bytes)
if string(header) != definitionHeader {
return fmt.Errorf("wrong header")
}
err = f.Close()
if err != nil {
log.Debugf("f.Close err: %v ", err)
return err
}

return nil
}

func (cg *ConfGen) parseFile(fileName string) error {

func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error {
// check header
err := cg.checkHeader(fileName)
err := checkHeader(bytes)
if err != nil {
log.Debugf("cg.checkHeader err: %v ", err)
log.Debugf("%s cg.checkHeader err: %v ", name, err)
return err
}

// parse yaml
var defFile DefFile
yamlFile, err := ioutil.ReadFile(fileName)
if err != nil {
log.Debugf("ioutil.ReadFile err: %v ", err)
return err
}
err = yaml.Unmarshal(yamlFile, &defFile)
err = yaml.Unmarshal(bytes, &defFile)
if err != nil {
log.Debugf("yaml.Unmarshal err: %v ", err)
log.Debugf("%s yaml.Unmarshal err: %v ", name, err)
return err
}

//skip if their skip tag match
for _, skipTag := range cg.opts.SkipWithTags {
for _, tag := range defFile.Tags {
if skipTag == tag {
return fmt.Errorf("skipping definition %s due to skip tag %s", fileName, tag)
log.Infof("skipping definition %s due to skip tag %s", name, tag)
return nil
}
}
}

// parse definition
definition := Definition{
FileName: fileName,
FileName: name,
Description: defFile.Description,
Details: defFile.Details,
Usage: defFile.Usage,
Expand Down Expand Up @@ -216,7 +203,7 @@ func (cg *ConfGen) parseFile(fileName string) error {
return nil
}

func (*ConfGen) GetDefinitionFiles(rootPath string) []string {
func getDefinitionFiles(rootPath string) []string {

var files []string

Expand Down
207 changes: 182 additions & 25 deletions pkg/confgen/confgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,22 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)

func Test_checkHeader(t *testing.T) {
filename := "/tmp/header.check.txt"
fakeFilename := "/tmp/fake_file.does.exist"
wrongHeader := "#wrong_confgen"
cg := NewConfGen(&Options{})
err := cg.checkHeader(fakeFilename)
require.Error(t, err)

err = os.WriteFile(filename, []byte(wrongHeader), 0644)
require.NoError(t, err)
err = cg.checkHeader(filename)
err := checkHeader([]byte(wrongHeader))
require.Error(t, err)

err = os.WriteFile(filename, []byte(definitionHeader), 0644)
require.NoError(t, err)
err = cg.checkHeader(filename)
err = checkHeader([]byte(definitionHeader))
require.NoError(t, err)
}

const generalConfig = `#flp_confgen
const shortConfig = `#flp_confgen
description:
test description
ingest:
Expand All @@ -73,6 +65,41 @@ visualization:
schemaVersion: "16"
`

const longConfig = `#flp_confgen
description:
test description
ingest:
collector:
port: 2155
portLegacy: 2156
hostName: 0.0.0.0
transform:
generic:
rules:
- input: SrcAddr
output: srcIP
encode:
type: prom
prom:
prefix: flp_
port: 9102
write:
type: loki
loki:
url: http://loki:3100
staticLabels:
job: flowlogs-pipeline
visualization:
type: grafana
grafana:
dashboards:
- name: "details"
title: "Flow-Logs to Metrics - Details"
time_from: "now-15m"
tags: "['flp','grafana','dashboard','details']"
schemaVersion: "16"
`

const networkDefs = `#flp_confgen
description:
test description
Expand Down Expand Up @@ -116,34 +143,110 @@ visualization:
Test grafana title
`

func Test_parseFile(t *testing.T) {
fakeFilename := "/tmp/fake_file.does.exist"
filename := "/tmp/parse_file.check.txt"
func Test_ParseDefinition(t *testing.T) {
cg := NewConfGen(&Options{})
err := cg.parseFile(fakeFilename)
require.Error(t, err)

err = os.WriteFile(filename, []byte(networkDefs), 0644)
require.NoError(t, err)
err = cg.parseFile(filename)
err := cg.ParseDefinition("def", []byte(networkDefs))
require.NoError(t, err)
}

func Test_getDefinitionFiles(t *testing.T) {
dirPath := "/tmp/getDefinitionFilesTest"
filename := "/def.yaml"
cg := NewConfGen(&Options{})
err := os.MkdirAll(dirPath, 0755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dirPath, filename), []byte(networkDefs), 0644)
require.NoError(t, err)
files := cg.GetDefinitionFiles(dirPath)
files := getDefinitionFiles(dirPath)
require.Equal(t, 1, len(files))
expected := []string{path.Join(dirPath, filename)}
require.ElementsMatch(t, expected, files)
}

func Test_RunConfGen(t *testing.T) {
func Test_RunShortConfGen(t *testing.T) {
// Prepare
dirPath := "/tmp/getDefinitionFilesTest"
cg := NewConfGen(&Options{
SrcFolder: dirPath,
DestConfFile: "/tmp/destConfigTest",
DestDocFile: "/tmp/destDocTest",
DestGrafanaJsonnetFolder: "/tmp/destJsonnetTest",
})
err := os.MkdirAll(dirPath, 0755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(shortConfig), 0644)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dirPath, "def.yaml"), []byte(networkDefs), 0644)
require.NoError(t, err)

// Run
err = cg.Run()
require.NoError(t, err)

// Unmarshal output
type Output struct {
Pipeline []config.Stage `yaml:"pipeline"`
Parameters []config.StageParam `yaml:"parameters"`
}
destCfgBytes, err := ioutil.ReadFile("/tmp/destConfigTest")
require.NoError(t, err)
var out Output
err = yaml.Unmarshal(destCfgBytes, &out)
require.NoError(t, err)
require.Len(t, out.Pipeline, 4)
require.Len(t, out.Parameters, 4)

// Pipeline structure
require.Equal(t,
[]config.Stage(
[]config.Stage{{Name: "ingest_collector"},
{Name: "transform_network", Follows: "ingest_collector"},
{Name: "extract_aggregate", Follows: "transform_network"},
{Name: "encode_prom", Follows: "extract_aggregate"}}),
out.Pipeline,
)

// Expects ingest
require.Equal(t, &api.IngestCollector{
HostName: "0.0.0.0",
Port: 2155,
PortLegacy: 2156,
}, out.Parameters[0].Ingest.Collector)

// Expects transform network
require.Len(t, out.Parameters[1].Transform.Network.Rules, 1)
require.Equal(t, api.NetworkTransformRule{
Input: "testInput",
Output: "testOutput",
Type: "add_service",
Parameters: "proto",
}, out.Parameters[1].Transform.Network.Rules[0])

// Expects aggregates
require.Len(t, out.Parameters[2].Extract.Aggregates, 1)
require.Equal(t, api.AggregateDefinition{
Name: "test_aggregates",
By: api.AggregateBy{"service"},
Operation: "sum",
RecordKey: "test_record_key",
}, out.Parameters[2].Extract.Aggregates[0])

// Expects prom encode
require.Len(t, out.Parameters[3].Encode.Prom.Metrics, 1)
require.Equal(t, &api.PromEncode{
Port: 9102,
Prefix: "flp_",
Metrics: api.PromMetricsItems{{
Name: "test_metric",
Type: "gauge",
Filter: api.PromMetricsFilter{Key: "", Value: ""},
ValueKey: "test_aggregates_value",
Labels: []string{"by", "aggregate"},
Buckets: []float64{},
}},
}, out.Parameters[3].Encode.Prom)
}

func Test_RunLongConfGen(t *testing.T) {
// Prepare
dirPath := "/tmp/getDefinitionFilesTest"
cg := NewConfGen(&Options{
Expand All @@ -154,7 +257,7 @@ func Test_RunConfGen(t *testing.T) {
})
err := os.MkdirAll(dirPath, 0755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(generalConfig), 0644)
err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(longConfig), 0644)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dirPath, "def.yaml"), []byte(networkDefs), 0644)
require.NoError(t, err)
Expand All @@ -173,6 +276,20 @@ func Test_RunConfGen(t *testing.T) {
var out Output
err = yaml.Unmarshal(destCfgBytes, &out)
require.NoError(t, err)
require.Len(t, out.Parameters, 6)
require.Len(t, out.Pipeline, 6)

// Pipeline structure
require.Equal(t,
[]config.Stage(
[]config.Stage{{Name: "ingest_collector"},
{Name: "transform_generic", Follows: "ingest_collector"},
{Name: "transform_network", Follows: "transform_generic"},
{Name: "extract_aggregate", Follows: "transform_network"},
{Name: "encode_prom", Follows: "extract_aggregate"},
{Name: "write_loki", Follows: "transform_network"}}),
out.Pipeline,
)

// Expects ingest
require.Equal(t, &api.IngestCollector{
Expand Down Expand Up @@ -216,4 +333,44 @@ func Test_RunConfGen(t *testing.T) {
Buckets: []float64{},
}},
}, out.Parameters[4].Encode.Prom)

// Expects loki
require.Equal(t, &api.WriteLoki{
URL: "http://loki:3100",
StaticLabels: model.LabelSet{"job": "flowlogs-pipeline"},
}, out.Parameters[5].Write.Loki)
}

func Test_GenerateTruncatedConfig(t *testing.T) {
// Prepare
cg := NewConfGen(&Options{
GenerateStages: []string{"extract_aggregate", "encode_prom"},
})
err := cg.ParseDefinition("defs", []byte(networkDefs))
require.NoError(t, err)

// Run
params := cg.GenerateTruncatedConfig()

require.Len(t, params, 2)
// Expects aggregates
require.Len(t, params[0].Extract.Aggregates, 1)
require.Equal(t, api.AggregateDefinition{
Name: "test_aggregates",
By: api.AggregateBy{"service"},
Operation: "sum",
RecordKey: "test_record_key",
}, params[0].Extract.Aggregates[0])

// Expects prom encode
require.Len(t, params[1].Encode.Prom.Metrics, 1)
require.Equal(t, &api.PromEncode{
Metrics: api.PromMetricsItems{{
Name: "test_metric",
Type: "gauge",
Filter: api.PromMetricsFilter{Key: "", Value: ""},
ValueKey: "test_aggregates_value",
Labels: []string{"by", "aggregate"},
}},
}, params[1].Encode.Prom)
}
Loading

0 comments on commit 2f037bd

Please sign in to comment.