From 9e05c62fba67a0e1e1d10b6965cfbf0a4179a1da Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Thu, 28 Apr 2022 17:02:00 +0300 Subject: [PATCH 1/3] made encode a terminal stage --- .../kubernetes/flowlogs-pipeline.conf.yaml | 5 -- pkg/confgen/flowlogs2metrics_config.go | 8 --- pkg/pipeline/aggregate_prom_test.go | 4 +- pkg/pipeline/encode/encode.go | 11 ++-- pkg/pipeline/encode/encode_kafka.go | 5 +- pkg/pipeline/encode/encode_prom.go | 52 ++++++++++--------- pkg/pipeline/encode/encode_prom_test.go | 14 ++--- pkg/pipeline/encode/encode_test.go | 9 ++-- pkg/pipeline/pipeline_builder.go | 4 +- pkg/test/e2e/kafka/flp-config.yaml | 5 -- pkg/test/e2e/pipline/flp-config.yaml | 5 -- 11 files changed, 50 insertions(+), 72 deletions(-) diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index f7d5025ef..5be3f62cd 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -304,9 +304,6 @@ parameters: prefix: flp_ type: prom name: encode_prom -- name: write_none - write: - type: none - name: write_loki write: loki: @@ -326,8 +323,6 @@ pipeline: name: extract_aggregate - follows: extract_aggregate name: encode_prom -- follows: encode_prom - name: write_none - follows: transform_network name: write_loki diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index cc94f10a7..8b9e94877 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -44,9 +44,6 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error { {"name": "encode_prom", "follows": "extract_aggregate", }, - {"name": "write_none", - "follows": "encode_prom", - }, {"name": "write_loki", "follows": "transform_network", }, @@ -99,11 +96,6 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error { }, }, }, - {"name": "write_none", - "write": map[string]interface{}{ - "type": "none", - }, - }, {"name": "write_loki", "write": map[string]interface{}{ "type": cg.config.Write.Type, diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index 188edeba7..27f303e92 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -173,8 +173,8 @@ parameters: // we use ElementsMatch() rather than Equals() require.ElementsMatch(t, tt.expectedAggs, actualAggs) - actualEncode := promEncode.Encode(actualAggs) - require.ElementsMatch(t, tt.expectedEncode, actualEncode) + promEncode.Encode(actualAggs) + require.ElementsMatch(t, tt.expectedEncode, promEncode.(*encode.EncodeProm).PrevRecords) }) } } diff --git a/pkg/pipeline/encode/encode.go b/pkg/pipeline/encode/encode.go index 4d0a423dc..62cb90804 100644 --- a/pkg/pipeline/encode/encode.go +++ b/pkg/pipeline/encode/encode.go @@ -23,19 +23,22 @@ import ( ) type encodeNone struct { + prevRecords []config.GenericMap } type Encoder interface { - Encode(in []config.GenericMap) []config.GenericMap + Encode(in []config.GenericMap) } // Encode encodes a flow before being stored -func (t *encodeNone) Encode(in []config.GenericMap) []config.GenericMap { - return in +func (t *encodeNone) Encode(in []config.GenericMap) { + t.prevRecords = in } // NewEncodeNone create a new encode func NewEncodeNone() (Encoder, error) { log.Debugf("entering NewEncodeNone") - return &encodeNone{}, nil + return &encodeNone{ + prevRecords: make([]config.GenericMap, 0), + }, nil } diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index ba9685150..fb587684c 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -43,11 +43,10 @@ type encodeKafka struct { } // Encode writes entries to kafka topic -func (r *encodeKafka) Encode(in []config.GenericMap) []config.GenericMap { +func (r *encodeKafka) Encode(in []config.GenericMap) { log.Debugf("entering encodeKafka Encode, #items = %d", len(in)) var msgs []kafkago.Message msgs = make([]kafkago.Message, 0) - out := make([]config.GenericMap, 0) for _, entry := range in { var entryByteArray []byte entryByteArray, _ = json.Marshal(entry) @@ -55,13 +54,11 @@ func (r *encodeKafka) Encode(in []config.GenericMap) []config.GenericMap { Value: entryByteArray, } msgs = append(msgs, msg) - out = append(out, entry) } err := r.kafkaWriter.WriteMessages(context.Background(), msgs...) if err != nil { log.Errorf("encodeKafka error: %v", err) } - return out } // NewEncodeKafka create a new writer to kafka diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 1c5f3c6b2..1c6183fb2 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -74,15 +74,16 @@ type metricCacheEntry struct { type metricCache map[string]*metricCacheEntry -type encodeProm struct { - mu sync.Mutex - port string - prefix string - metrics map[string]metricInfo - expiryTime int64 - mList *list.List - mCache metricCache - exitChan <-chan struct{} +type EncodeProm struct { + mu sync.Mutex + port string + prefix string + metrics map[string]metricInfo + expiryTime int64 + mList *list.List + mCache metricCache + exitChan <-chan struct{} + PrevRecords []config.GenericMap } var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{ @@ -91,8 +92,8 @@ var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{ }) // Encode encodes a metric before being stored -func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap { - log.Debugf("entering encodeProm Encode") +func (e *EncodeProm) Encode(metrics []config.GenericMap) { + log.Debugf("entering EncodeProm Encode") e.mu.Lock() defer e.mu.Unlock() out := make([]config.GenericMap, 0) @@ -102,13 +103,13 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap { metricOut := e.EncodeMetric(metric) out = append(out, metricOut...) } + e.PrevRecords = out log.Debugf("out = %v", out) log.Debugf("cache = %v", e.mCache) log.Debugf("list = %v", e.mList) - return out } -func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap { +func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap { log.Debugf("entering EncodeMetric. metricRecord = %v", metricRecord) out := make([]config.GenericMap, 0) for metricName, mInfo := range e.metrics { @@ -183,7 +184,7 @@ func generateCacheKey(sig *entrySignature) string { return eInfoString } -func (e *encodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]string) *metricCacheEntry { +func (e *EncodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]string) *metricCacheEntry { // save item in cache; use eInfo as key to the cache var cEntry *metricCacheEntry nowInSecs := time.Now().Unix() @@ -210,7 +211,7 @@ func (e *encodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]st return cEntry } -func (e *encodeProm) cleanupExpiredEntriesLoop() { +func (e *EncodeProm) cleanupExpiredEntriesLoop() { ticker := time.NewTicker(time.Duration(e.expiryTime) * time.Second) for { select { @@ -224,7 +225,7 @@ func (e *encodeProm) cleanupExpiredEntriesLoop() { } // cleanupExpiredEntries - any entry that has expired should be removed from the prometheus reporting and cache -func (e *encodeProm) cleanupExpiredEntries() { +func (e *EncodeProm) cleanupExpiredEntries() { log.Debugf("entering cleanupExpiredEntries") e.mu.Lock() defer e.mu.Unlock() @@ -262,7 +263,7 @@ func (e *encodeProm) cleanupExpiredEntries() { } // startPrometheusInterface listens for prometheus resource usage requests -func startPrometheusInterface(w *encodeProm) { +func startPrometheusInterface(w *EncodeProm) { log.Debugf("entering startPrometheusInterface") log.Infof("startPrometheusInterface: port num = %s", w.port) @@ -337,14 +338,15 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) { } log.Debugf("metrics = %v", metrics) - w := &encodeProm{ - port: fmt.Sprintf(":%v", portNum), - prefix: promPrefix, - metrics: metrics, - expiryTime: expiryTime, - mList: list.New(), - mCache: make(metricCache), - exitChan: utils.ExitChannel(), + w := &EncodeProm{ + port: fmt.Sprintf(":%v", portNum), + prefix: promPrefix, + metrics: metrics, + expiryTime: expiryTime, + mList: list.New(), + mCache: make(metricCache), + exitChan: utils.ExitChannel(), + PrevRecords: make([]config.GenericMap, 0), } go startPrometheusInterface(w) go w.cleanupExpiredEntriesLoop() diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 631e388d9..ca8bd2491 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -75,7 +75,7 @@ func initNewEncodeProm(t *testing.T) Encoder { func Test_NewEncodeProm(t *testing.T) { newEncode := initNewEncodeProm(t) - encodeProm := newEncode.(*encodeProm) + encodeProm := newEncode.(*EncodeProm) require.Equal(t, ":9103", encodeProm.port) require.Equal(t, "test_", encodeProm.prefix) require.Equal(t, 3, len(encodeProm.metrics)) @@ -95,7 +95,7 @@ func Test_NewEncodeProm(t *testing.T) { require.Equal(t, cInfo.labelNames, expectedList) entry := test.GetExtractMockEntry() input := []config.GenericMap{entry} - output := encodeProm.Encode(input) + encodeProm.Encode(input) entryLabels1 := make(map[string]string, 3) entryLabels2 := make(map[string]string, 3) @@ -115,8 +115,8 @@ func Test_NewEncodeProm(t *testing.T) { "Labels": entryLabels2, "value": 34, } - require.Contains(t, output, gEntryInfo1) - require.Contains(t, output, gEntryInfo2) + require.Contains(t, encodeProm.PrevRecords, gEntryInfo1) + require.Contains(t, encodeProm.PrevRecords, gEntryInfo2) gaugeA, err := gInfo.promGauge.GetMetricWith(entryLabels1) require.Equal(t, nil, err) bytesA := testutil.ToFloat64(gaugeA) @@ -157,7 +157,7 @@ func Test_EncodeAggregate(t *testing.T) { "count": 1, }} - newEncode := &encodeProm{ + newEncode := &EncodeProm{ port: ":0000", prefix: "test_", metrics: map[string]metricInfo{ @@ -174,7 +174,7 @@ func Test_EncodeAggregate(t *testing.T) { mCache: make(metricCache), } - output := newEncode.Encode(metrics) + newEncode.Encode(metrics) gEntryInfo1 := config.GenericMap{ "Name": "test_gauge", @@ -186,5 +186,5 @@ func Test_EncodeAggregate(t *testing.T) { } expectedOutput := []config.GenericMap{gEntryInfo1} - require.Equal(t, expectedOutput, output) + require.Equal(t, expectedOutput, newEncode.PrevRecords) } diff --git a/pkg/pipeline/encode/encode_test.go b/pkg/pipeline/encode/encode_test.go index b75a0d0fd..6cd05dab0 100644 --- a/pkg/pipeline/encode/encode_test.go +++ b/pkg/pipeline/encode/encode_test.go @@ -44,13 +44,12 @@ func TestEncodeNone(t *testing.T) { "varbool": false, } map3 := config.GenericMap{} - var out []config.GenericMap var in []config.GenericMap - out = encodeNone.Encode(in) - require.Equal(t, 0, len(out)) + encodeNone.Encode(in) + require.Equal(t, 0, len(encodeNone.prevRecords)) in = append(in, map1) in = append(in, map2) in = append(in, map3) - out = encodeNone.Encode(in) - require.Equal(t, len(in), len(out)) + encodeNone.Encode(in) + require.Equal(t, len(in), len(encodeNone.prevRecords)) } diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 3de0c742b..993fe98d9 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -236,9 +236,9 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, } }) case StageEncode: - stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { + stage = node.AsTerminal(func(in <-chan []config.GenericMap) { for i := range in { - out <- pe.Encoder.Encode(i) + pe.Encoder.Encode(i) } }) case StageTransform: diff --git a/pkg/test/e2e/kafka/flp-config.yaml b/pkg/test/e2e/kafka/flp-config.yaml index 2a387d12a..44e70f33c 100644 --- a/pkg/test/e2e/kafka/flp-config.yaml +++ b/pkg/test/e2e/kafka/flp-config.yaml @@ -12,8 +12,6 @@ data: follows: decode_json - name: kafka_encode follows: transform_none - - name: write_none - follows: kafka_encode parameters: - name: kafka_ingest ingest: @@ -34,6 +32,3 @@ data: kafka: address: my-cluster-kafka-bootstrap.default.svc:9092 topic: test_topic_out - - name: write_none - write: - type: none diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 2cd449b3a..322e86aa2 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -277,9 +277,6 @@ data: prefix: flp_ type: prom name: encode_prom - - name: write_none - write: - type: none - name: write_loki write: loki: @@ -299,7 +296,5 @@ data: name: extract_aggregate - follows: extract_aggregate name: encode_prom - - follows: encode_prom - name: write_none - follows: transform_network name: write_loki From d8a54c1cd8e14d0f795ea81ed75d312ab9aca3ff Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Sun, 1 May 2022 11:42:28 +0300 Subject: [PATCH 2/3] saved prev records for testing purposes --- pkg/pipeline/encode/encode_kafka.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index fb587684c..5d1d635a2 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -40,6 +40,7 @@ type kafkaWriteMessage interface { type encodeKafka struct { kafkaParams api.EncodeKafka kafkaWriter kafkaWriteMessage + prevRecords []config.GenericMap } // Encode writes entries to kafka topic @@ -59,6 +60,7 @@ func (r *encodeKafka) Encode(in []config.GenericMap) { if err != nil { log.Errorf("encodeKafka error: %v", err) } + r.prevRecords = in } // NewEncodeKafka create a new writer to kafka From c71e9ed8313d0ae003afa37a55608d51ec68674a Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Sun, 1 May 2022 11:42:48 +0300 Subject: [PATCH 3/3] made encode stage terminal --- pkg/pipeline/pipeline_builder.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 993fe98d9..2e3945e56 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -206,7 +206,7 @@ func isReceptor(p *pipelineEntry) bool { } func isSender(p *pipelineEntry) bool { - return p.stageType != StageWrite + return p.stageType != StageWrite && p.stageType != StageEncode } func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, error) { @@ -236,11 +236,13 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, } }) case StageEncode: - stage = node.AsTerminal(func(in <-chan []config.GenericMap) { + encode := node.AsTerminal(func(in <-chan []config.GenericMap) { for i := range in { pe.Encoder.Encode(i) } }) + b.terminalNodes = append(b.terminalNodes, encode) + stage = encode case StageTransform: stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { for i := range in {