Skip to content

Commit

Permalink
Remove decode stages, rename stages
Browse files Browse the repository at this point in the history
Following netobserv/flowlogs-pipeline#225 , decode stages should not be set after ingesting ipfix or grpc

Also rename stages to make them more explicit
  • Loading branch information
jotak committed Jun 16, 2022
1 parent fb59e57 commit d59e060
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
12 changes: 5 additions & 7 deletions controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,29 +262,27 @@ func (b *builder) addTransformStages(lastStage *config.PipelineBuilderStage) {
func (b *builder) buildPipelineConfig() ([]config.Stage, []config.StageParam) {
var pipeline config.PipelineBuilderStage
if b.confKind == ConfKafkaTransformer {
pipeline = config.NewKafkaPipeline("ingest", api.IngestKafka{
pipeline = config.NewKafkaPipeline("kafka-read", api.IngestKafka{
Brokers: []string{b.desiredKafka.Address},
Topic: b.desiredKafka.Topic,
GroupId: b.confKind, // Without groupid, each message is delivered to each consumers
Decoder: api.Decoder{Type: "json"},
})
pipeline = pipeline.DecodeJSON("decode")
} else if b.portProtocol == corev1.ProtocolUDP {
// UDP Port: IPFIX collector with JSON decoder
pipeline = config.NewCollectorPipeline("ingest", api.IngestCollector{
pipeline = config.NewCollectorPipeline("ipfix", api.IngestCollector{
Port: int(b.desired.Port),
HostName: "0.0.0.0",
})
pipeline = pipeline.DecodeJSON("decode")
} else {
// TCP Port: GRPC collector (eBPF agent) with Protobuf decoder
pipeline = config.NewGRPCPipeline("ingest", api.IngestGRPCProto{
pipeline = config.NewGRPCPipeline("grpc", api.IngestGRPCProto{
Port: int(b.desired.Port),
})
pipeline = pipeline.DecodeProtobuf("decode")
}

if b.confKind == ConfKafkaIngester {
pipeline = pipeline.EncodeKafka("kafka", api.EncodeKafka{
pipeline = pipeline.EncodeKafka("kafka-write", api.EncodeKafka{
Address: b.desiredKafka.Address,
Topic: b.desiredKafka.Topic,
})
Expand Down
14 changes: 7 additions & 7 deletions controllers/flowlogspipeline/flp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,10 @@ func TestConfigMapShouldDeserializeAsJSON(t *testing.T) {
assert.Equal("trace", decoded.LogLevel)

params := decoded.Parameters
assert.Len(params, 7)
assert.Len(params, 6)
assert.Equal(flp.Port, int32(params[0].Ingest.Collector.Port))

lokiCfg := params[3].Write.Loki
lokiCfg := params[2].Write.Loki
assert.Equal(loki.URL, lokiCfg.URL)
assert.Equal(loki.BatchWait.Duration.String(), lokiCfg.BatchWait)
assert.Equal(loki.MinBackoff.Duration.String(), lokiCfg.MinBackoff)
Expand All @@ -396,7 +396,7 @@ func TestConfigMapShouldDeserializeAsJSON(t *testing.T) {
assert.EqualValues([]string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "DstK8S_Namespace", "DstK8S_OwnerName", "FlowDirection"}, lokiCfg.Labels)
assert.Equal(`{app="netobserv-flowcollector"}`, fmt.Sprintf("%v", lokiCfg.StaticLabels))

assert.Equal(flp.PrometheusPort, int32(params[6].Encode.Prom.Port))
assert.Equal(flp.PrometheusPort, int32(params[5].Encode.Prom.Port))

}

Expand Down Expand Up @@ -519,22 +519,22 @@ func TestPipelineConfig(t *testing.T) {
stages, parameters := b.buildPipelineConfig()
assert.True(validatePipelineConfig(stages, parameters))
jsonStages, _ := json.Marshal(stages)
assert.Equal(`[{"name":"ingest"},{"name":"decode","follows":"ingest"},{"name":"enrich","follows":"decode"},{"name":"loki","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages))
assert.Equal(`[{"name":"ipfix"},{"name":"enrich","follows":"ipfix"},{"name":"loki","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages))

// Kafka Ingester
kafka.Enable = true
b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfKafkaIngester, true)
stages, parameters = b.buildPipelineConfig()
assert.True(validatePipelineConfig(stages, parameters))
jsonStages, _ = json.Marshal(stages)
assert.Equal(`[{"name":"ingest"},{"name":"decode","follows":"ingest"},{"name":"kafka","follows":"decode"}]`, string(jsonStages))
assert.Equal(`[{"name":"ipfix"},{"name":"kafka-write","follows":"ipfix"}]`, string(jsonStages))

// Kafka Transformer
b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, &kafka, ConfKafkaTransformer, true)
stages, parameters = b.buildPipelineConfig()
assert.True(validatePipelineConfig(stages, parameters))
jsonStages, _ = json.Marshal(stages)
assert.Equal(`[{"name":"ingest"},{"name":"decode","follows":"ingest"},{"name":"enrich","follows":"decode"},{"name":"loki","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages))
assert.Equal(`[{"name":"kafka-read"},{"name":"enrich","follows":"kafka-read"},{"name":"loki","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages))
}

func TestPipelineTraceStage(t *testing.T) {
Expand All @@ -546,5 +546,5 @@ func TestPipelineTraceStage(t *testing.T) {
stages, parameters := b.buildPipelineConfig()
assert.True(validatePipelineConfig(stages, parameters))
jsonStages, _ := json.Marshal(stages)
assert.Equal(`[{"name":"ingest"},{"name":"decode","follows":"ingest"},{"name":"enrich","follows":"decode"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages))
assert.Equal(`[{"name":"ipfix"},{"name":"enrich","follows":"ipfix"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages))
}

0 comments on commit d59e060

Please sign in to comment.