Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Decoder stage (breaking change) #225

Merged
merged 3 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,12 @@ FLP is a framework. The main FLP object is the **pipeline**. FLP **pipeline** ca

The pipeline is constructed of a sequence of stages. Each stage is classified into one of the following types:
- **ingest** - obtain flows from some source, one entry per line
- **decode** - parse input lines into a known format, e.g., dictionary (map) of AWS or goflow data
- **transform** - convert entries into a standard format; can include multiple transform stages
- **write** - provide the means to write the data to some target, e.g. loki, standard output, etc
- **extract** - derive a set of metrics from the imported flows
- **encode** - make the data available in appropriate format (e.g. prometheus)

The first stage in a pipeline must be an **ingest** stage.
The **ingest** stage is typically followed by a **decode** stage, unless the **ingest** stage also performs the decoding.
Each stage (other than an **ingest** stage) specifies the stage it follows.
Multiple stages may follow from a particular stage, thus allowing the same data to be consumed by multiple parallel pipelines.
For example, multiple **transform** stages may be performed and the results may be output to different targets.
Expand All @@ -132,14 +130,12 @@ A full configuration file with the data consumed by two different transforms mi
```yaml
pipeline:
- name: ingest1
- name: decode1
follows: ingest1
- name: generic1
follows: decode1
follows: ingest1
- name: write1
follows: generic1
- name: generic2
follows: decode1
follows: ingest1
- name: write2
follows: generic2
parameters:
Expand All @@ -148,9 +144,8 @@ parameters:
type: file_loop
file:
filename: hack/examples/ocp-ipfix-flowlogs.json
- name: decode1
decode:
type: json
decoder:
type: json
- name: generic1
transform:
type: generic
Expand Down Expand Up @@ -200,19 +195,16 @@ For example:
log-level: info
pipeline:
- name: ingest_file
- name: decode_json
follows: ingest_file
- name: write_stdout
follows: write_stdout
parameters
- name ingest_file
follows: ingest_file
parameters:
- name: ingest_file
ingest:
type: file
file:
filename: hack/examples/ocp-ipfix-flowlogs.json
- name: decode_json
decode:
type: json
decoder:
type: json
- name: write_stdout
write:
type: stdout
Expand All @@ -224,11 +216,11 @@ parameters

2. Using command line parameters:

`./flowlogs-pipeline --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"decode1\"},{\"follows\":\"decode1\",\"name\":\"write1\"}]" --parameters "[{\"ingest\":{\"file\":{\"filename\":\"hack/examples/ocp-ipfix-flowlogs.json\"},\"type\":\"file\"},\"name\":\"ingest1\"},{\"decode\":{\"type\":\"json\"},\"name\":\"decode1\"},{\"name\":\"write1\",\"write\":{\"type\":\"stdout\"}}]"`
`./flowlogs-pipeline --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"write1\"}]" --parameters "[{\"ingest\":{\"file\":{\"filename\":\"hack/examples/ocp-ipfix-flowlogs.json\"},\"decoder\":{\"type\":\"json\"},\"type\":\"file\"},\"name\":\"ingest1\"},{\"name\":\"write1\",\"write\":{\"type\":\"stdout\"}}]"`

Options included in the command line override the options specified in the config file.

`flowlogs-pipeline --log-level debug --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"decode1\"},{\"follows\":\"decode1\",\"name\":\"write1\"}]" --config <configFile>`
`flowlogs-pipeline --log-level debug --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"write1\"}]" --config <configFile>`

3. TODO: environment variables

Expand Down
4 changes: 2 additions & 2 deletions contrib/benchmarks/baseline/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pipeline:
type: file
file:
filename: ../../contrib/benchmarks/baseline/log-lines.json
decode:
type: json
decoder:
type: json
encode:
type: none
extract:
Expand Down
4 changes: 2 additions & 2 deletions contrib/benchmarks/transform/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pipeline:
type: file
file:
filename: ../../contrib/benchmarks/baseline/log-lines.json
decode:
type: json
decoder:
type: json
encode:
type: none
extract:
Expand Down
5 changes: 0 additions & 5 deletions contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ parameters:
portLegacy: 2056
type: collector
name: ingest_collector
- decode:
type: json
name: decode_json
- name: transform_generic
transform:
generic:
Expand Down Expand Up @@ -328,8 +325,6 @@ parameters:
pipeline:
- name: ingest_collector
- follows: ingest_collector
name: decode_json
- follows: decode_json
name: transform_generic
- follows: transform_generic
name: transform_network
Expand Down
11 changes: 4 additions & 7 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ Following is the supported API format for the kafka ingest:
groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity)
startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
batchReadTimeout: how often (in milliseconds) to process input
decoder: decoder to use (E.g. json or protobuf)
type: (enum) one of the following:
json: JSON decoder
protobuf: Protobuf decoder
</pre>
## Ingest GRPC from Network Observability eBPF Agent
Following is the supported API format for the Network Observability eBPF ingest:
Expand All @@ -68,13 +72,6 @@ Following is the supported API format for the Network Observability eBPF ingest:
port: the port number to listen on
bufferLength: the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)
</pre>
## Aws ingest API
Following is the supported API format for Aws flow entries:

<pre>
aws:
fields: list of aws flow log fields
</pre>
## Transform Generic API
Following is the supported API format for generic transformations:

Expand Down
2 changes: 0 additions & 2 deletions hack/deploy-and-monitor-k8s-network-workload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pipeline:
port: 2055
portLegacy: 2056
type: collector
decode:
type: json
encode:
type: none
extract:
Expand Down
4 changes: 2 additions & 2 deletions hack/examples/ocp-ipfix-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pipeline:
type: file
file:
filename: hack/examples/ocp-ipfix-flowlogs.json
decode:
type: json
decoder:
type: json
transform:
type: generic
4 changes: 0 additions & 4 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ const (
CollectorType = "collector"
GRPCType = "grpc"
KafkaType = "kafka"
JSONType = "json"
PBType = "protobuf"
AWSType = "aws"
StdoutType = "stdout"
LokiType = "loki"
AggregateType = "aggregates"
Expand Down Expand Up @@ -56,7 +53,6 @@ type API struct {
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
Expand Down
22 changes: 0 additions & 22 deletions pkg/api/decode_aws.go

This file was deleted.

14 changes: 14 additions & 0 deletions pkg/api/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package api

type Decoder struct {
Type string `yaml:"type" json:"type" enum:"DecoderEnum" doc:"one of the following:"`
}

type DecoderEnum struct {
JSON string `yaml:"json" json:"json" doc:"JSON decoder"`
Protobuf string `yaml:"protobuf" json:"protobuf" doc:"Protobuf decoder"`
}

func DecoderName(decoder string) string {
return GetEnumName(DecoderEnum{}, decoder)
}
3 changes: 2 additions & 1 deletion pkg/api/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type enums struct {
KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum
ConnTrackOperationEnum ConnTrackOperationEnum
ConnTrackOutputRecordTypeEnum ConnTrackOutputRecordTypeEnum
DecoderEnum DecoderEnum
}

type enumNameCacheKey struct {
Expand Down Expand Up @@ -67,7 +68,7 @@ func GetEnumName(enum interface{}, operation string) string {
if found {
return cachedValue
} else {
log.Panicf("can't find operation %s in enum %v", operation, enum)
log.Panicf("can't find name '%s' in enum %v", operation, enum)
return ""
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/api/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ type IngestKafka struct {
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
}
10 changes: 1 addition & 9 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error {
"log-level": "error",
"pipeline": []map[string]string{
{"name": "ingest_collector"},
{"name": "decode_json",
"follows": "ingest_collector",
},
{"name": "transform_generic",
"follows": "decode_json",
"follows": "ingest_collector",
},
{"name": "transform_network",
"follows": "transform_generic",
Expand All @@ -59,11 +56,6 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error {
},
},
},
{"name": "decode_json",
"decode": map[string]interface{}{
"type": "json",
},
},
{"name": "transform_generic",
"transform": map[string]interface{}{
"type": "generic",
Expand Down
13 changes: 4 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Stage struct {
type StageParam struct {
Name string `json:"name"`
Ingest *Ingest `json:"ingest,omitempty"`
Decode *Decode `json:"decode,omitempty"`
Transform *Transform `json:"transform,omitempty"`
Extract *Extract `json:"extract,omitempty"`
Encode *Encode `json:"encode,omitempty"`
Expand All @@ -64,14 +63,10 @@ type Ingest struct {
}

type File struct {
Filename string `json:"filename"`
Loop bool `json:"loop"`
Chunks int `json:"chunks"`
}

type Decode struct {
Type string `json:"type"`
Aws *api.DecodeAws `json:"aws,omitempty"`
Filename string `json:"filename"`
Decoder api.Decoder `json:"decoder"`
Loop bool `json:"loop"`
Chunks int `json:"chunks"`
}

type Transform struct {
Expand Down
15 changes: 0 additions & 15 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,6 @@ func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuild
return PipelineBuilderStage{pipeline: b.pipeline, lastStage: name}
}

// DecodeJSON chains the current stage with a JSON decode stage and returns that new stage
func (b *PipelineBuilderStage) DecodeJSON(name string) PipelineBuilderStage {
return b.next(name, StageParam{Name: name, Decode: &Decode{Type: api.JSONType}})
}

// DecodeProtobuf chains the current stage with a protobuf decode stage and returns that new stage
func (b *PipelineBuilderStage) DecodeProtobuf(name string) PipelineBuilderStage {
return b.next(name, StageParam{Name: name, Decode: &Decode{Type: api.PBType}})
}

// DecodeAWS chains the current stage with an AWS decode stage and returns that new stage
func (b *PipelineBuilderStage) DecodeAWS(name string, aws api.DecodeAws) PipelineBuilderStage {
return b.next(name, StageParam{Name: name, Decode: &Decode{Type: api.AWSType, Aws: &aws}})
}

// Aggregate chains the current stage with an aggregate stage and returns that new stage
func (b *PipelineBuilderStage) Aggregate(name string, aggs []api.AggregateDefinition) PipelineBuilderStage {
return b.next(name, StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: aggs}})
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func TestKafkaPromPipeline(t *testing.T) {
Brokers: []string{"http://kafka"},
Topic: "netflows",
GroupId: "my-group",
Decoder: api.Decoder{Type: "json"},
})
pl = pl.TransformFilter("filter", api.TransformFilter{
Rules: []api.TransformFilterRule{{
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[0])
require.NoError(t, err)
require.Equal(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group"}}}`, string(b))
require.Equal(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","decoder":{"type":"json"}}}}`, string(b))

b, err = json.Marshal(params[1])
require.NoError(t, err)
Expand Down
26 changes: 11 additions & 15 deletions pkg/pipeline/decode/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@
package decode

import (
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
)

type Decoder interface {
Decode(in []interface{}) []config.GenericMap
}

type decodeNone struct {
}

// Decode decodes input strings to a list of flow entries
func (c *decodeNone) Decode(in []interface{}) []config.GenericMap {
log.Debugf("entering Decode none")
log.Debugf("Decode none, in = %v", in)
var f []config.GenericMap
return f
}

// NewDecodeNone create a new decode
func NewDecodeNone() (Decoder, error) {
return &decodeNone{}, nil
func GetDecoder(params api.Decoder) (Decoder, error) {
switch params.Type {
case api.DecoderName("JSON"):
return NewDecodeJson()
case api.DecoderName("Protobuf"):
return NewProtobuf()
}
panic(fmt.Sprintf("`decode` type %s not defined", params.Type))
}
Loading