diff --git a/README.md b/README.md index 89a1f515f..57a6cb77b 100644 --- a/README.md +++ b/README.md @@ -112,14 +112,12 @@ FLP is a framework. The main FLP object is the **pipeline**. FLP **pipeline** ca The pipeline is constructed of a sequence of stages. Each stage is classified into one of the following types: - **ingest** - obtain flows from some source, one entry per line -- **decode** - parse input lines into a known format, e.g., dictionary (map) of AWS or goflow data - **transform** - convert entries into a standard format; can include multiple transform stages - **write** - provide the means to write the data to some target, e.g. loki, standard output, etc - **extract** - derive a set of metrics from the imported flows - **encode** - make the data available in appropriate format (e.g. prometheus) The first stage in a pipeline must be an **ingest** stage. -The **ingest** stage is typically followed by a **decode** stage, unless the **ingest** stage also performs the decoding. Each stage (other than an **ingest** stage) specifies the stage it follows. Multiple stages may follow from a particular stage, thus allowing the same data to be consumed by multiple parallel pipelines. For example, multiple **transform** stages may be performed and the results may be output to different targets. @@ -132,14 +130,12 @@ A full configuration file with the data consumed by two different transforms mi ```yaml pipeline: - name: ingest1 - - name: decode1 - follows: ingest1 - name: generic1 - follows: decode1 + follows: ingest1 - name: write1 follows: generic1 - name: generic2 - follows: decode1 + follows: ingest1 - name: write2 follows: generic2 parameters: @@ -148,9 +144,8 @@ parameters: type: file_loop file: filename: hack/examples/ocp-ipfix-flowlogs.json - - name: decode1 - decode: - type: json + decoder: + type: json - name: generic1 transform: type: generic @@ -200,19 +195,16 @@ For example: log-level: info pipeline: - name: ingest_file - - name: decode_json - follows: ingest_file - name: write_stdout - follows: write_stdout -parameters - - name ingest_file + follows: ingest_file +parameters: + - name: ingest_file ingest: type: file file: filename: hack/examples/ocp-ipfix-flowlogs.json - - name: decode_json - decode: - type: json + decoder: + type: json - name: write_stdout write: type: stdout @@ -224,11 +216,11 @@ parameters 2. Using command line parameters: -`./flowlogs-pipeline --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"decode1\"},{\"follows\":\"decode1\",\"name\":\"write1\"}]" --parameters "[{\"ingest\":{\"file\":{\"filename\":\"hack/examples/ocp-ipfix-flowlogs.json\"},\"type\":\"file\"},\"name\":\"ingest1\"},{\"decode\":{\"type\":\"json\"},\"name\":\"decode1\"},{\"name\":\"write1\",\"write\":{\"type\":\"stdout\"}}]"` +`./flowlogs-pipeline --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"write1\"}]" --parameters "[{\"ingest\":{\"file\":{\"filename\":\"hack/examples/ocp-ipfix-flowlogs.json\"},\"decoder\":{\"type\":\"json\"},\"type\":\"file\"},\"name\":\"ingest1\"},{\"name\":\"write1\",\"write\":{\"type\":\"stdout\"}}]"` Options included in the command line override the options specified in the config file. -`flowlogs-pipeline --log-level debug --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"decode1\"},{\"follows\":\"decode1\",\"name\":\"write1\"}]" --config ` +`flowlogs-pipeline --log-level debug --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"write1\"}]" --config ` 3. TODO: environment variables diff --git a/contrib/benchmarks/baseline/config.yaml b/contrib/benchmarks/baseline/config.yaml index 5576b960d..62b195ceb 100644 --- a/contrib/benchmarks/baseline/config.yaml +++ b/contrib/benchmarks/baseline/config.yaml @@ -4,8 +4,8 @@ pipeline: type: file file: filename: ../../contrib/benchmarks/baseline/log-lines.json - decode: - type: json + decoder: + type: json encode: type: none extract: diff --git a/contrib/benchmarks/transform/config.yaml b/contrib/benchmarks/transform/config.yaml index f382456f5..76cc93c90 100644 --- a/contrib/benchmarks/transform/config.yaml +++ b/contrib/benchmarks/transform/config.yaml @@ -4,8 +4,8 @@ pipeline: type: file file: filename: ../../contrib/benchmarks/baseline/log-lines.json - decode: - type: json + decoder: + type: json encode: type: none extract: diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 1ba5a7448..66e078d80 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -8,9 +8,6 @@ parameters: portLegacy: 2056 type: collector name: ingest_collector -- decode: - type: json - name: decode_json - name: transform_generic transform: generic: @@ -328,8 +325,6 @@ parameters: pipeline: - name: ingest_collector - follows: ingest_collector - name: decode_json -- follows: decode_json name: transform_generic - follows: transform_generic name: transform_network diff --git a/hack/deploy-and-monitor-k8s-network-workload.sh b/hack/deploy-and-monitor-k8s-network-workload.sh index 61f8f88d8..3a55ac513 100755 --- a/hack/deploy-and-monitor-k8s-network-workload.sh +++ b/hack/deploy-and-monitor-k8s-network-workload.sh @@ -31,8 +31,6 @@ pipeline: port: 2055 portLegacy: 2056 type: collector - decode: - type: json encode: type: none extract: diff --git a/hack/examples/ocp-ipfix-config.yaml b/hack/examples/ocp-ipfix-config.yaml index eec70395d..91df8f94a 100644 --- a/hack/examples/ocp-ipfix-config.yaml +++ b/hack/examples/ocp-ipfix-config.yaml @@ -3,7 +3,7 @@ pipeline: type: file file: filename: hack/examples/ocp-ipfix-flowlogs.json - decode: - type: json + decoder: + type: json transform: type: generic \ No newline at end of file diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 8c2d8e112..d2ca415c4 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -29,11 +29,8 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error { "log-level": "error", "pipeline": []map[string]string{ {"name": "ingest_collector"}, - {"name": "decode_json", - "follows": "ingest_collector", - }, {"name": "transform_generic", - "follows": "decode_json", + "follows": "ingest_collector", }, {"name": "transform_network", "follows": "transform_generic", @@ -59,11 +56,6 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error { }, }, }, - {"name": "decode_json", - "decode": map[string]interface{}{ - "type": "json", - }, - }, {"name": "transform_generic", "transform": map[string]interface{}{ "type": "generic", diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index b48a5cfdb..c190c2764 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -18,7 +18,6 @@ package ingest import ( - "encoding/json" "testing" "time" @@ -126,16 +125,9 @@ func Test_IngestKafka(t *testing.T) { receivedEntries := <-ingestOutput require.Equal(t, 3, len(receivedEntries)) - require.Equal(t, toMap(t, record1), receivedEntries[0]) - require.Equal(t, toMap(t, record2), receivedEntries[1]) - require.Equal(t, toMap(t, record3), receivedEntries[2]) -} - -func toMap(t *testing.T, in string) config.GenericMap { - var m config.GenericMap - err := json.Unmarshal([]byte(in), &m) - require.NoError(t, err) - return m + require.Equal(t, test.DeserializeJSONToMap(t, record1), receivedEntries[0]) + require.Equal(t, test.DeserializeJSONToMap(t, record2), receivedEntries[1]) + require.Equal(t, test.DeserializeJSONToMap(t, record3), receivedEntries[2]) } type fakeKafkaReader struct { @@ -184,5 +176,5 @@ func Test_KafkaListener(t *testing.T) { receivedEntries := <-ingestOutput require.Equal(t, 1, len(receivedEntries)) - require.Equal(t, toMap(t, string(fakeRecord)), receivedEntries[0]) + require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0]) } diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 15ecfd601..cb42bbd10 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -6,7 +6,6 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" @@ -45,7 +44,6 @@ type pipelineEntry struct { stageName string stageType string Ingester ingest.Ingester - Decoder decode.Decoder Transformer transform.Transformer Extractor extract.Extractor Encoder encode.Encoder diff --git a/pkg/test/e2e/kafka/flp-config.yaml b/pkg/test/e2e/kafka/flp-config.yaml index 44e70f33c..318177718 100644 --- a/pkg/test/e2e/kafka/flp-config.yaml +++ b/pkg/test/e2e/kafka/flp-config.yaml @@ -6,10 +6,8 @@ data: flowlogs-pipeline.conf.yaml: | pipeline: - name: kafka_ingest - - name: decode_json - follows: kafka_ingest - name: transform_none - follows: decode_json + follows: kafka_ingest - name: kafka_encode follows: transform_none parameters: @@ -20,9 +18,8 @@ data: brokers: [my-cluster-kafka-bootstrap.default.svc:9092] topic: test_topic_in groupid: group_test_in - - name: decode_json - decode: - type: json + decoder: + type: json - name: transform_none transform: type: none diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 7d787bbc7..1531175b2 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -11,11 +11,10 @@ data: hostname: 0.0.0.0 port: 2055 portLegacy: 2056 + decoder: + type: json type: collector name: ingest_collector - - decode: - type: json - name: decode_json - name: transform_generic transform: generic: @@ -288,8 +287,6 @@ data: pipeline: - name: ingest_collector - follows: ingest_collector - name: decode_json - - follows: decode_json name: transform_generic - follows: transform_generic name: transform_network diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 1a3c6ae07..570d283a2 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -19,6 +19,7 @@ package test import ( "bytes" + "encoding/json" "fmt" "os" "os/exec" @@ -155,3 +156,10 @@ func RunCommand(command string) string { fmt.Printf("output = %s\n", string(output)) return string(output) } + +func DeserializeJSONToMap(t *testing.T, in string) config.GenericMap { + var m config.GenericMap + err := json.Unmarshal([]byte(in), &m) + require.NoError(t, err) + return m +} diff --git a/playground/aws1.yml b/playground/aws1.yml deleted file mode 100644 index 54eeeb862..000000000 --- a/playground/aws1.yml +++ /dev/null @@ -1,33 +0,0 @@ -log-level: debug -pipeline: - ingest: - type: file - file: - filename: playground/aws1_input.txt - decode: - type: aws - aws: - fields: - - version - - account-id - - interface-id - - srcaddr - - dstaddr - - srcport - - dstport - - protocol - - packets - - bytes - - start - - end - - action - - log-status - transform: - - type: none - extract: - type: none - encode: - type: none - write: - type: stdout - diff --git a/playground/aws1_input.txt b/playground/aws1_input.txt deleted file mode 100644 index 9bb5e79c8..000000000 --- a/playground/aws1_input.txt +++ /dev/null @@ -1,5 +0,0 @@ -2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK -2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK -2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK -2 123456789010 eni-1235b8ca123456789 172.31.16.139 203.0.113.12 0 0 1 4 336 1432917094 1432917142 REJECT OK -2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK diff --git a/playground/aws2.yml b/playground/aws2.yml deleted file mode 100644 index 615231cf1..000000000 --- a/playground/aws2.yml +++ /dev/null @@ -1,17 +0,0 @@ -log-level: debug -pipeline: - ingest: - type: file - file: - filename: playground/aws2_input.txt - decode: - type: aws - transform: - - type: none - extract: - type: none - encode: - type: none - write: - type: none - diff --git a/playground/aws2_input.txt b/playground/aws2_input.txt deleted file mode 100644 index 9bb5e79c8..000000000 --- a/playground/aws2_input.txt +++ /dev/null @@ -1,5 +0,0 @@ -2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK -2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK -2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK -2 123456789010 eni-1235b8ca123456789 172.31.16.139 203.0.113.12 0 0 1 4 336 1432917094 1432917142 REJECT OK -2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK diff --git a/playground/aws3.yml b/playground/aws3.yml deleted file mode 100644 index 1dc671b40..000000000 --- a/playground/aws3.yml +++ /dev/null @@ -1,40 +0,0 @@ -log-level: debug -pipeline: - ingest: - type: file - file: - filename: playground/aws3_input.txt - decode: - type: aws - aws: - fields: - - version - - vpc-id - - subnet-id - - instance-id - - interface-id - - account-id - - type - - srcaddr - - dstaddr - - srcport - - dstport - - pkt-srcaddr - - pkt-dstaddr - - protocol - - bytes - - packets - - start - - end - - action - - tcp-flags - - log-status - transform: - - type: none - extract: - type: none - encode: - type: none - write: - type: none - diff --git a/playground/aws3_input.txt b/playground/aws3_input.txt deleted file mode 100644 index 8995128d0..000000000 --- a/playground/aws3_input.txt +++ /dev/null @@ -1,6 +0,0 @@ -3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK -3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43416 10.0.0.62 52.213.180.42 6 376 7 1566848875 1566848933 ACCEPT 18 OK -3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 100701 70 1566848875 1566848933 ACCEPT 2 OK -3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 632 12 1566848875 1566848933 ACCEPT 18 OK -3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 63388 1219 1566848933 1566849113 ACCEPT 1 OK -3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 23294588 15774 1566848933 1566849113 ACCEPT 1 OK diff --git a/playground/goflow3.yml b/playground/goflow3.yml index 4b1077537..06e02ad3b 100644 --- a/playground/goflow3.yml +++ b/playground/goflow3.yml @@ -1,10 +1,8 @@ log-level: debug pipeline: - name: ingest1 - - name: decode1 - follows: ingest1 - name: generic1 - follows: decode1 + follows: ingest1 - name: write1 follows: generic1 parameters: @@ -13,9 +11,8 @@ parameters: type: file file: filename: playground/goflow2_input.txt - - name: decode1 - decode: - type: json + decoder: + type: json - name: generic1 transform: type: generic diff --git a/playground/goflow7.yml b/playground/goflow7.yml index 65474487e..bfe0254ce 100644 --- a/playground/goflow7.yml +++ b/playground/goflow7.yml @@ -2,25 +2,19 @@ log-level: info pipeline: - stage: ingest name: ingest1 - - stage: decode - name: decode1 - follows: ingest1 - stage: transform name: generic1 - follows: decode1 + follows: ingest1 - stage: write name: write1 follows: generic1 - stage: transform name: generic2 - follows: decode1 + follows: ingest1 - stage: write name: write2 follows: generic2 parameters: - - name: decode1 - decode: - type: json - name: generic1 transform: type: generic @@ -55,6 +49,8 @@ parameters: type: file_loop file: filename: playground/goflow2_input.txt + decoder: + type: json - name: write2 write: type: stdout diff --git a/playground/kafka4.yml b/playground/kafka4.yml index 8750390b7..531c0fa5e 100644 --- a/playground/kafka4.yml +++ b/playground/kafka4.yml @@ -1,10 +1,8 @@ log-level: debug pipeline: - name: ingest_kafka - - name: decode_json - follows: ingest_kafka - name: encode_kafka - follows: decode_json + follows: ingest_kafka parameters: - name: ingest_kafka ingest: @@ -13,9 +11,8 @@ parameters: brokers: ["192.168.56.103:9092"] topic: topic1 groupid: group1 - - name: decode_json - decode: - type: json + decoder: + type: json - name: encode_kafka encode: type: kafka diff --git a/playground/prom1.yml b/playground/prom1.yml index c33c38c0b..0c4cd04b2 100644 --- a/playground/prom1.yml +++ b/playground/prom1.yml @@ -1,10 +1,8 @@ log-level: debug pipeline: - name: ingest1 - - name: decode1 - follows: ingest1 - name: transform1 - follows: decode1 + follows: ingest1 - name: encode1 follows: transform1 - name: write1 @@ -15,9 +13,8 @@ parameters: type: file_loop file: filename: playground/goflow2_input.txt - - name: decode1 - decode: - type: json + decoder: + type: json - name: transform1 transform: type: generic