Skip to content

Commit

Permalink
Update test data and confgen
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Jun 14, 2022
1 parent 3f1596d commit b634451
Show file tree
Hide file tree
Showing 22 changed files with 48 additions and 198 deletions.
30 changes: 11 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,12 @@ FLP is a framework. The main FLP object is the **pipeline**. FLP **pipeline** ca

The pipeline is constructed of a sequence of stages. Each stage is classified into one of the following types:
- **ingest** - obtain flows from some source, one entry per line
- **decode** - parse input lines into a known format, e.g., dictionary (map) of AWS or goflow data
- **transform** - convert entries into a standard format; can include multiple transform stages
- **write** - provide the means to write the data to some target, e.g. loki, standard output, etc
- **extract** - derive a set of metrics from the imported flows
- **encode** - make the data available in appropriate format (e.g. prometheus)

The first stage in a pipeline must be an **ingest** stage.
The **ingest** stage is typically followed by a **decode** stage, unless the **ingest** stage also performs the decoding.
Each stage (other than an **ingest** stage) specifies the stage it follows.
Multiple stages may follow from a particular stage, thus allowing the same data to be consumed by multiple parallel pipelines.
For example, multiple **transform** stages may be performed and the results may be output to different targets.
Expand All @@ -132,14 +130,12 @@ A full configuration file with the data consumed by two different transforms mi
```yaml
pipeline:
- name: ingest1
- name: decode1
follows: ingest1
- name: generic1
follows: decode1
follows: ingest1
- name: write1
follows: generic1
- name: generic2
follows: decode1
follows: ingest1
- name: write2
follows: generic2
parameters:
Expand All @@ -148,9 +144,8 @@ parameters:
type: file_loop
file:
filename: hack/examples/ocp-ipfix-flowlogs.json
- name: decode1
decode:
type: json
decoder:
type: json
- name: generic1
transform:
type: generic
Expand Down Expand Up @@ -200,19 +195,16 @@ For example:
log-level: info
pipeline:
- name: ingest_file
- name: decode_json
follows: ingest_file
- name: write_stdout
follows: write_stdout
parameters
- name ingest_file
follows: ingest_file
parameters:
- name: ingest_file
ingest:
type: file
file:
filename: hack/examples/ocp-ipfix-flowlogs.json
- name: decode_json
decode:
type: json
decoder:
type: json
- name: write_stdout
write:
type: stdout
Expand All @@ -224,11 +216,11 @@ parameters

2. Using command line parameters:

`./flowlogs-pipeline --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"decode1\"},{\"follows\":\"decode1\",\"name\":\"write1\"}]" --parameters "[{\"ingest\":{\"file\":{\"filename\":\"hack/examples/ocp-ipfix-flowlogs.json\"},\"type\":\"file\"},\"name\":\"ingest1\"},{\"decode\":{\"type\":\"json\"},\"name\":\"decode1\"},{\"name\":\"write1\",\"write\":{\"type\":\"stdout\"}}]"`
`./flowlogs-pipeline --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"write1\"}]" --parameters "[{\"ingest\":{\"file\":{\"filename\":\"hack/examples/ocp-ipfix-flowlogs.json\"},\"decoder\":{\"type\":\"json\"},\"type\":\"file\"},\"name\":\"ingest1\"},{\"name\":\"write1\",\"write\":{\"type\":\"stdout\"}}]"`

Options included in the command line override the options specified in the config file.

`flowlogs-pipeline --log-level debug --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"decode1\"},{\"follows\":\"decode1\",\"name\":\"write1\"}]" --config <configFile>`
`flowlogs-pipeline --log-level debug --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"write1\"}]" --config <configFile>`

3. TODO: environment variables

Expand Down
4 changes: 2 additions & 2 deletions contrib/benchmarks/baseline/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pipeline:
type: file
file:
filename: ../../contrib/benchmarks/baseline/log-lines.json
decode:
type: json
decoder:
type: json
encode:
type: none
extract:
Expand Down
4 changes: 2 additions & 2 deletions contrib/benchmarks/transform/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pipeline:
type: file
file:
filename: ../../contrib/benchmarks/baseline/log-lines.json
decode:
type: json
decoder:
type: json
encode:
type: none
extract:
Expand Down
5 changes: 0 additions & 5 deletions contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ parameters:
portLegacy: 2056
type: collector
name: ingest_collector
- decode:
type: json
name: decode_json
- name: transform_generic
transform:
generic:
Expand Down Expand Up @@ -328,8 +325,6 @@ parameters:
pipeline:
- name: ingest_collector
- follows: ingest_collector
name: decode_json
- follows: decode_json
name: transform_generic
- follows: transform_generic
name: transform_network
Expand Down
2 changes: 0 additions & 2 deletions hack/deploy-and-monitor-k8s-network-workload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pipeline:
port: 2055
portLegacy: 2056
type: collector
decode:
type: json
encode:
type: none
extract:
Expand Down
4 changes: 2 additions & 2 deletions hack/examples/ocp-ipfix-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pipeline:
type: file
file:
filename: hack/examples/ocp-ipfix-flowlogs.json
decode:
type: json
decoder:
type: json
transform:
type: generic
10 changes: 1 addition & 9 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error {
"log-level": "error",
"pipeline": []map[string]string{
{"name": "ingest_collector"},
{"name": "decode_json",
"follows": "ingest_collector",
},
{"name": "transform_generic",
"follows": "decode_json",
"follows": "ingest_collector",
},
{"name": "transform_network",
"follows": "transform_generic",
Expand All @@ -59,11 +56,6 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error {
},
},
},
{"name": "decode_json",
"decode": map[string]interface{}{
"type": "json",
},
},
{"name": "transform_generic",
"transform": map[string]interface{}{
"type": "generic",
Expand Down
16 changes: 4 additions & 12 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package ingest

import (
"encoding/json"
"testing"
"time"

Expand Down Expand Up @@ -126,16 +125,9 @@ func Test_IngestKafka(t *testing.T) {
receivedEntries := <-ingestOutput

require.Equal(t, 3, len(receivedEntries))
require.Equal(t, toMap(t, record1), receivedEntries[0])
require.Equal(t, toMap(t, record2), receivedEntries[1])
require.Equal(t, toMap(t, record3), receivedEntries[2])
}

func toMap(t *testing.T, in string) config.GenericMap {
var m config.GenericMap
err := json.Unmarshal([]byte(in), &m)
require.NoError(t, err)
return m
require.Equal(t, test.DeserializeJSONToMap(t, record1), receivedEntries[0])
require.Equal(t, test.DeserializeJSONToMap(t, record2), receivedEntries[1])
require.Equal(t, test.DeserializeJSONToMap(t, record3), receivedEntries[2])
}

type fakeKafkaReader struct {
Expand Down Expand Up @@ -184,5 +176,5 @@ func Test_KafkaListener(t *testing.T) {
receivedEntries := <-ingestOutput

require.Equal(t, 1, len(receivedEntries))
require.Equal(t, toMap(t, string(fakeRecord)), receivedEntries[0])
require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0])
}
2 changes: 0 additions & 2 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
Expand Down Expand Up @@ -45,7 +44,6 @@ type pipelineEntry struct {
stageName string
stageType string
Ingester ingest.Ingester
Decoder decode.Decoder
Transformer transform.Transformer
Extractor extract.Extractor
Encoder encode.Encoder
Expand Down
9 changes: 3 additions & 6 deletions pkg/test/e2e/kafka/flp-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ data:
flowlogs-pipeline.conf.yaml: |
pipeline:
- name: kafka_ingest
- name: decode_json
follows: kafka_ingest
- name: transform_none
follows: decode_json
follows: kafka_ingest
- name: kafka_encode
follows: transform_none
parameters:
Expand All @@ -20,9 +18,8 @@ data:
brokers: [my-cluster-kafka-bootstrap.default.svc:9092]
topic: test_topic_in
groupid: group_test_in
- name: decode_json
decode:
type: json
decoder:
type: json
- name: transform_none
transform:
type: none
Expand Down
7 changes: 2 additions & 5 deletions pkg/test/e2e/pipline/flp-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ data:
hostname: 0.0.0.0
port: 2055
portLegacy: 2056
decoder:
type: json
type: collector
name: ingest_collector
- decode:
type: json
name: decode_json
- name: transform_generic
transform:
generic:
Expand Down Expand Up @@ -288,8 +287,6 @@ data:
pipeline:
- name: ingest_collector
- follows: ingest_collector
name: decode_json
- follows: decode_json
name: transform_generic
- follows: transform_generic
name: transform_network
Expand Down
8 changes: 8 additions & 0 deletions pkg/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package test

import (
"bytes"
"encoding/json"
"fmt"
"os"
"os/exec"
Expand Down Expand Up @@ -155,3 +156,10 @@ func RunCommand(command string) string {
fmt.Printf("output = %s\n", string(output))
return string(output)
}

func DeserializeJSONToMap(t *testing.T, in string) config.GenericMap {
var m config.GenericMap
err := json.Unmarshal([]byte(in), &m)
require.NoError(t, err)
return m
}
33 changes: 0 additions & 33 deletions playground/aws1.yml

This file was deleted.

5 changes: 0 additions & 5 deletions playground/aws1_input.txt

This file was deleted.

17 changes: 0 additions & 17 deletions playground/aws2.yml

This file was deleted.

5 changes: 0 additions & 5 deletions playground/aws2_input.txt

This file was deleted.

40 changes: 0 additions & 40 deletions playground/aws3.yml

This file was deleted.

6 changes: 0 additions & 6 deletions playground/aws3_input.txt

This file was deleted.

Loading

0 comments on commit b634451

Please sign in to comment.