Skip to content

Commit

Permalink
Merge confgen & main models (#254) (#267)
Browse files Browse the repository at this point in the history
* Confgen: init from options (avoid many globals)

* Use main model in confgen, use pipeline builder

Fixes #254

- Removed duplicated confgen model
- Use PipelineBuilder to simplify pipeline generation
- Make confgen easier to consume as a lib (e.g. for NOO)
  - Do not make it necessary to call "Run": parsing definition files
    should be sufficient
  - Do not make it necessary to work with files written on disk: work
    with []byte instead
- SkipWithTags should not result in an error when a file is skipped
- Add confgen tests

* Avoid using globals

A side-effect of removing globals in write_loki is that it changes how
the config is read, and set with defaults. Instead of unmarshaling a
second time to automatically get defaults, we now call an explicit function
that sets the default. Also, now removing loki URL default, it now has
to be set explicitely

* Update ConnTrack builder

* More defer cleanup in tests, and use ioutils / temp dir/files

Also fixed jsonnet dir actually used as filename prefix rather than
directory

* User ConfigFileStruct in confgen

* Update pkg/config/config.go

Co-authored-by: Ronen Schaffer <ronen.schaffer@ibm.com>

* Use config.ConfigFileStruct in tests

Co-authored-by: Ronen Schaffer <ronen.schaffer@ibm.com>
  • Loading branch information
jotak and ronensc committed Jul 29, 2022
1 parent cba9b51 commit bfbd89b
Show file tree
Hide file tree
Showing 37 changed files with 660 additions and 479 deletions.
28 changes: 12 additions & 16 deletions cmd/confgenerator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
logLevel string
envPrefix = "FLP_CONFGEN"
defaultLogFileName = ".confgen"
opts confgen.Options
)

// rootCmd represents the root command
Expand Down Expand Up @@ -92,8 +93,8 @@ func initLogger() {
log.SetFormatter(&log.TextFormatter{DisableColors: false, FullTimestamp: true})
}

func dumpConfig() {
configAsJSON, _ := json.MarshalIndent(confgen.Opt, "", "\t")
func dumpConfig(opts *confgen.Options) {
configAsJSON, _ := json.MarshalIndent(opts, "", "\t")
log.Infof("configuration:\n%s\n", configAsJSON)
}

Expand Down Expand Up @@ -128,12 +129,12 @@ func initFlags() {

rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName))
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
rootCmd.PersistentFlags().StringVar(&confgen.Opt.SrcFolder, "srcFolder", "network_definitions", "source folder")
rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestConfFile, "destConfFile", "/tmp/flowlogs-pipeline.conf.yaml", "destination configuration file")
rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestDocFile, "destDocFile", "/tmp/metrics.md", "destination documentation file (.md)")
rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestGrafanaJsonnetFolder, "destGrafanaJsonnetFolder", "/tmp/jsonnet", "destination grafana jsonnet folder")
rootCmd.PersistentFlags().StringSliceVar(&confgen.Opt.SkipWithTags, "skipWithTags", nil, "Skip definitions with Tags")
rootCmd.PersistentFlags().StringSliceVar(&confgen.Opt.GenerateStages, "generateStages", nil, "Produce only specified stages (ingest, transform_generic, transform_network, extract_aggregate, encode_prom, write_loki")
rootCmd.PersistentFlags().StringVar(&opts.SrcFolder, "srcFolder", "network_definitions", "source folder")
rootCmd.PersistentFlags().StringVar(&opts.DestConfFile, "destConfFile", "/tmp/flowlogs-pipeline.conf.yaml", "destination configuration file")
rootCmd.PersistentFlags().StringVar(&opts.DestDocFile, "destDocFile", "/tmp/metrics.md", "destination documentation file (.md)")
rootCmd.PersistentFlags().StringVar(&opts.DestGrafanaJsonnetFolder, "destGrafanaJsonnetFolder", "/tmp/jsonnet", "destination grafana jsonnet folder")
rootCmd.PersistentFlags().StringSliceVar(&opts.SkipWithTags, "skipWithTags", nil, "Skip definitions with Tags")
rootCmd.PersistentFlags().StringSliceVar(&opts.GenerateStages, "generateStages", nil, "Produce only specified stages (ingest, transform_generic, transform_network, extract_aggregate, encode_prom, write_loki")
}

func main() {
Expand All @@ -150,15 +151,10 @@ func run() {
fmt.Printf("Starting %s:\n=====\nBuild Version: %s\nBuild Date: %s\n\n",
filepath.Base(os.Args[0]), BuildVersion, BuildDate)
// Dump the configuration
dumpConfig()
dumpConfig(&opts)
// creating a new configuration generator
confGen, err := confgen.NewConfGen()
if err != nil {
log.Fatalf("failed to initialize NewConfGen %s", err)
os.Exit(1)
}

err = confGen.Run()
confGen := confgen.NewConfGen(&opts)
err := confGen.Run()
if err != nil {
log.Fatalf("failed to initialize NewConfGen %s", err)
os.Exit(1)
Expand Down
19 changes: 10 additions & 9 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
logLevel string
envPrefix = "FLOWLOGS-PIPILNE"
defaultLogFileName = ".flowlogs-pipeline"
opts config.Options
)

// rootCmd represents the root command
Expand Down Expand Up @@ -98,8 +99,8 @@ func initLogger() {
log.SetFormatter(&log.TextFormatter{DisableColors: false, FullTimestamp: true, PadLevelText: true, DisableQuote: true})
}

func dumpConfig() {
configAsJSON, _ := json.MarshalIndent(config.Opt, "", " ")
func dumpConfig(opts config.Options) {
configAsJSON, _ := json.MarshalIndent(opts, "", " ")
fmt.Printf("Using configuration:\n%s\n", configAsJSON)
}

Expand Down Expand Up @@ -133,9 +134,9 @@ func initFlags() {
cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName))
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
rootCmd.PersistentFlags().StringVar(&config.Opt.Health.Port, "health.port", "8080", "Health server port")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&config.Opt.Parameters, "parameters", "", "json of config file parameters field")
rootCmd.PersistentFlags().StringVar(&opts.Health.Port, "health.port", "8080", "Health server port")
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field")
}

func main() {
Expand All @@ -159,9 +160,9 @@ func run() {
filepath.Base(os.Args[0]), BuildVersion, BuildDate)

// Dump configuration
dumpConfig()
dumpConfig(opts)

err = config.ParseConfig()
cfg, err := config.ParseConfig(opts)
if err != nil {
log.Errorf("error in parsing config file: %v", err)
os.Exit(1)
Expand All @@ -171,14 +172,14 @@ func run() {
utils.SetupElegantExit()

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline()
mainPipeline, err = pipeline.NewPipeline(&cfg)
if err != nil {
log.Fatalf("failed to initialize pipeline %s", err)
os.Exit(1)
}

// Start health report server
health.NewHealthServer(mainPipeline)
health.NewHealthServer(&opts, mainPipeline)

// Starts the flows pipeline
mainPipeline.Run()
Expand Down
36 changes: 24 additions & 12 deletions pkg/api/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,30 @@ type WriteLoki struct {
TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"`
}

func GetWriteLokiDefaults() WriteLoki {
return WriteLoki{
URL: "http://loki:3100/",
BatchWait: "1s",
BatchSize: 100 * 1024,
Timeout: "10s",
MinBackoff: "1s",
MaxBackoff: "5m",
MaxRetries: 10,
StaticLabels: model.LabelSet{},
TimestampLabel: "TimeReceived",
TimestampScale: "1s",
func (w *WriteLoki) SetDefaults() {
if w.BatchWait == "" {
w.BatchWait = "1s"
}
if w.BatchSize == 0 {
w.BatchSize = 100 * 1024
}
if w.Timeout == "" {
w.Timeout = "10s"
}
if w.MinBackoff == "" {
w.MinBackoff = "1s"
}
if w.MaxBackoff == "" {
w.MaxBackoff = "1s"
}
if w.MaxRetries == 0 {
w.MaxRetries = 10
}
if w.TimestampLabel == "" {
w.TimestampLabel = "TimeReceived"
}
if w.TimestampScale == "" {
w.TimestampScale = "1s"
}
}

Expand Down
79 changes: 34 additions & 45 deletions pkg/confgen/confgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"gopkg.in/yaml.v2"
)

var (
const (
definitionExt = ".yaml"
definitionHeader = "#flp_confgen"
configFileName = "config.yaml"
Expand All @@ -50,6 +50,7 @@ type Definition struct {
type Definitions []Definition

type ConfGen struct {
opts *Options
config *Config
transformRules api.NetworkTransformRules
aggregateDefinitions aggregate.Definitions
Expand All @@ -71,47 +72,52 @@ type DefFile struct {

func (cg *ConfGen) Run() error {
var err error
cg.config, err = cg.ParseConfigFile(Opt.SrcFolder + "/" + configFileName)
cg.config, err = cg.ParseConfigFile(cg.opts.SrcFolder + "/" + configFileName)
if err != nil {
log.Debugf("cg.ParseConfigFile err: %v ", err)
return err
}

definitionFiles := cg.GetDefinitionFiles(Opt.SrcFolder)
definitionFiles := getDefinitionFiles(cg.opts.SrcFolder)
for _, definitionFile := range definitionFiles {
err := cg.parseFile(definitionFile)
b, err := ioutil.ReadFile(definitionFile)
if err != nil {
log.Debugf("cg.parseFile err: %v ", err)
log.Debugf("ioutil.ReadFile err: %v ", err)
continue
}
err = cg.ParseDefinition(definitionFile, b)
if err != nil {
log.Debugf("cg.parseDefinition err: %v ", err)
continue
}
}

cg.Dedupe()
cg.dedupe()

if len(Opt.GenerateStages) != 0 {
config := cg.GenerateTruncatedConfig(Opt.GenerateStages)
err = cg.writeConfigFile(Opt.DestConfFile, config)
if len(cg.opts.GenerateStages) != 0 {
cfg := cg.GenerateTruncatedConfig()
err = cg.writeConfigFile(cg.opts.DestConfFile, cfg)
if err != nil {
log.Debugf("cg.GenerateTruncatedConfig err: %v ", err)
return err
}
return nil
} else {
config := cg.GenerateFlowlogs2PipelineConfig()
err = cg.writeConfigFile(Opt.DestConfFile, config)
err = cg.writeConfigFile(cg.opts.DestConfFile, config)
if err != nil {
log.Debugf("cg.GenerateFlowlogs2PipelineConfig err: %v ", err)
return err
}
}

err = cg.generateDoc(Opt.DestDocFile)
err = cg.generateDoc(cg.opts.DestDocFile)
if err != nil {
log.Debugf("cg.generateDoc err: %v ", err)
return err
}

err = cg.generateGrafanaJsonnet(Opt.DestGrafanaJsonnetFolder)
err = cg.generateGrafanaJsonnet(cg.opts.DestGrafanaJsonnetFolder)
if err != nil {
log.Debugf("cg.generateGrafanaJsonnet err: %v ", err)
return err
Expand All @@ -120,62 +126,44 @@ func (cg *ConfGen) Run() error {
return nil
}

func (cg *ConfGen) checkHeader(fileName string) error {
// check header
f, err := os.OpenFile(fileName, os.O_RDONLY, 0644)
if err != nil {
log.Debugf("os.OpenFile error: %v ", err)
return err
}
func checkHeader(bytes []byte) error {
header := make([]byte, len(definitionHeader))
_, err = f.Read(header)
if err != nil || string(header) != definitionHeader {
log.Debugf("Wrong header file: %s ", fileName)
copy(header, bytes)
if string(header) != definitionHeader {
return fmt.Errorf("wrong header")
}
err = f.Close()
if err != nil {
log.Debugf("f.Close err: %v ", err)
return err
}

return nil
}

func (cg *ConfGen) parseFile(fileName string) error {

func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error {
// check header
err := cg.checkHeader(fileName)
err := checkHeader(bytes)
if err != nil {
log.Debugf("cg.checkHeader err: %v ", err)
log.Debugf("%s cg.checkHeader err: %v ", name, err)
return err
}

// parse yaml
var defFile DefFile
yamlFile, err := ioutil.ReadFile(fileName)
err = yaml.Unmarshal(bytes, &defFile)
if err != nil {
log.Debugf("ioutil.ReadFile err: %v ", err)
return err
}
err = yaml.Unmarshal(yamlFile, &defFile)
if err != nil {
log.Debugf("yaml.Unmarshal err: %v ", err)
log.Debugf("%s yaml.Unmarshal err: %v ", name, err)
return err
}

//skip if their skip tag match
for _, skipTag := range Opt.SkipWithTags {
for _, skipTag := range cg.opts.SkipWithTags {
for _, tag := range defFile.Tags {
if skipTag == tag {
return fmt.Errorf("skipping definition %s due to skip tag %s", fileName, tag)
log.Infof("skipping definition %s due to skip tag %s", name, tag)
return nil
}
}
}

// parse definition
definition := Definition{
FileName: fileName,
FileName: name,
Description: defFile.Description,
Details: defFile.Details,
Usage: defFile.Usage,
Expand Down Expand Up @@ -215,7 +203,7 @@ func (cg *ConfGen) parseFile(fileName string) error {
return nil
}

func (*ConfGen) GetDefinitionFiles(rootPath string) []string {
func getDefinitionFiles(rootPath string) []string {

var files []string

Expand All @@ -235,11 +223,12 @@ func (*ConfGen) GetDefinitionFiles(rootPath string) []string {
return files
}

func NewConfGen() (*ConfGen, error) {
func NewConfGen(opts *Options) *ConfGen {
return &ConfGen{
opts: opts,
transformRules: api.NetworkTransformRules{},
aggregateDefinitions: aggregate.Definitions{},
definitions: Definitions{},
visualizations: Visualizations{},
}, nil
}
}
Loading

0 comments on commit bfbd89b

Please sign in to comment.