Skip to content

Commit

Permalink
multipipe (netobserv#83)
Browse files Browse the repository at this point in the history
* implemented multi-pipe and ported tests; Loki not working properly

* fixed write-loki tests; refactored some code

* removed extra line from merge

* addressed review comments

* fixed structure of flowlogs-pipeline.conf.yaml and addressed other reviewer comments

* fixed copying of data for nextStages

* removed unnecessary test of result

* removed redundant call to InitConfig
  • Loading branch information
KalmanMeth committed Mar 2, 2022
1 parent 553f43f commit 6d1be78
Show file tree
Hide file tree
Showing 52 changed files with 1,449 additions and 1,136 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ dashboards: $(JB) $(JSONNET) ## Build grafana dashboards
docs: FORCE ## Update flowlogs-pipeline documentation
@./hack/update-docs.sh
@go run cmd/apitodoc/main.go > docs/api.md

.PHONY: clean
clean: ## Clean
rm -f "${FLP_BIN_FILE}"
Expand Down
24 changes: 6 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,12 @@ Usage:
flowlogs-pipeline [flags]

Flags:
--config string config file (default is $HOME/.flowlogs-pipeline)
--health.port string Health server port (default "8080")
-h, --help help for flowlogs-pipeline
--log-level string Log level: debug, info, warning, error (default "error")
--pipeline.decode.aws string aws fields
--pipeline.decode.type string Decode type: aws, json, none
--pipeline.encode.kafka string Kafka encode API
--pipeline.encode.prom string Prometheus encode API
--pipeline.encode.type string Encode type: prom, json, kafka, none
--pipeline.extract.aggregates string Aggregates (see docs)
--pipeline.extract.type string Extract type: aggregates, none
--pipeline.ingest.collector string Ingest collector API
--pipeline.ingest.file.filename string Ingest filename (file)
--pipeline.ingest.kafka string Ingest Kafka API
--pipeline.ingest.type string Ingest type: file, collector,file_loop (required)
--pipeline.transform string Transforms (list) API (default "[{"type": "none"}]")
--pipeline.write.loki string Loki write API
--pipeline.write.type string Write type: stdout, none
--config string config file (default is $HOME/.flowlogs-pipeline)
--health.port string Health server port (default "8080")
-h, --help help for flowlogs-pipeline
--log-level string Log level: debug, info, warning, error (default "error")
--parameters string json of config file parameters field
--pipeline string json of config file pipeline field
```
<!---END-AUTO-flowlogs-pipeline_help--->

Expand Down
26 changes: 9 additions & 17 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,31 +126,17 @@ func bindFlags(cmd *cobra.Command, v *viper.Viper) {

func initFlags() {
cobra.OnInitialize(initConfig)

rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName))
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Type, "pipeline.ingest.type", "", "Ingest type: file, collector,file_loop (required)")
rootCmd.PersistentFlags().StringVar(&config.Opt.Health.Port, "health.port", "8080", "Health server port")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.File.Filename, "pipeline.ingest.file.filename", "", "Ingest filename (file)")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeline.ingest.collector", "", "Ingest collector API")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Kafka, "pipeline.ingest.kafka", "", "Ingest Kafka API")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Aws, "pipeline.decode.aws", "", "aws fields")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Type, "pipeline.decode.type", "", "Decode type: aws, json, none")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Transform, "pipeline.transform", "[{\"type\": \"none\"}]", "Transforms (list) API")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Type, "pipeline.extract.type", "", "Extract type: aggregates, none")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Aggregates, "pipeline.extract.aggregates", "", "Aggregates (see docs)")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Type, "pipeline.write.type", "", "Write type: stdout, none")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Loki, "pipeline.write.loki", "", "Loki write API")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Type, "pipeline.encode.type", "", "Encode type: prom, json, kafka, none")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Prom, "pipeline.encode.prom", "", "Prometheus encode API")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Kafka, "pipeline.encode.kafka", "", "Kafka encode API")

_ = rootCmd.MarkPersistentFlagRequired("pipeline.ingest.type")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&config.Opt.Parameters, "parameters", "", "json of config file parameters field")
}

func main() {
// Initialize flags (command line parameters)
initFlags()

if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
Expand All @@ -170,6 +156,12 @@ func run() {
// Dump configuration
dumpConfig()

err = config.ParseConfig()
if err != nil {
log.Errorf("error in parsing config file: %v", err)
os.Exit(1)
}

// Setup (threads) exit manager
utils.SetupElegantExit()

Expand Down
2 changes: 1 addition & 1 deletion contrib/dashboards/jsonnet/dashboard_details.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ dashboard.new(
)
.addTarget(
prometheus.target(
expr='topk(10,rate(flp_egress_per_namespace{aggregate=~".*pod.*"}[1m]))',
expr='topk(10,rate(flp_egress_per_namespace{aggregate=~".*Pod.*"}[1m]))',
)
), gridPos={
x: 0,
Expand Down
Loading

0 comments on commit 6d1be78

Please sign in to comment.