Skip to content

Commit

Permalink
Merge pull request #194 from KalmanMeth/prom-final
Browse files Browse the repository at this point in the history
made encode a terminal stage
  • Loading branch information
Mario Macias committed May 4, 2022
2 parents 251f96a + c71e9ed commit fdd8ca2
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 73 deletions.
5 changes: 0 additions & 5 deletions contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,6 @@ parameters:
prefix: flp_
type: prom
name: encode_prom
- name: write_none
write:
type: none
- name: write_loki
write:
loki:
Expand All @@ -326,8 +323,6 @@ pipeline:
name: extract_aggregate
- follows: extract_aggregate
name: encode_prom
- follows: encode_prom
name: write_none
- follows: transform_network
name: write_loki

8 changes: 0 additions & 8 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error {
{"name": "encode_prom",
"follows": "extract_aggregate",
},
{"name": "write_none",
"follows": "encode_prom",
},
{"name": "write_loki",
"follows": "transform_network",
},
Expand Down Expand Up @@ -99,11 +96,6 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error {
},
},
},
{"name": "write_none",
"write": map[string]interface{}{
"type": "none",
},
},
{"name": "write_loki",
"write": map[string]interface{}{
"type": cg.config.Write.Type,
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/aggregate_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ parameters:
// we use ElementsMatch() rather than Equals()
require.ElementsMatch(t, tt.expectedAggs, actualAggs)

actualEncode := promEncode.Encode(actualAggs)
require.ElementsMatch(t, tt.expectedEncode, actualEncode)
promEncode.Encode(actualAggs)
require.ElementsMatch(t, tt.expectedEncode, promEncode.(*encode.EncodeProm).PrevRecords)
})
}
}
11 changes: 7 additions & 4 deletions pkg/pipeline/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@ import (
)

type encodeNone struct {
prevRecords []config.GenericMap
}

type Encoder interface {
Encode(in []config.GenericMap) []config.GenericMap
Encode(in []config.GenericMap)
}

// Encode encodes a flow before being stored
func (t *encodeNone) Encode(in []config.GenericMap) []config.GenericMap {
return in
func (t *encodeNone) Encode(in []config.GenericMap) {
t.prevRecords = in
}

// NewEncodeNone create a new encode
func NewEncodeNone() (Encoder, error) {
log.Debugf("entering NewEncodeNone")
return &encodeNone{}, nil
return &encodeNone{
prevRecords: make([]config.GenericMap, 0),
}, nil
}
7 changes: 3 additions & 4 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,27 @@ type kafkaWriteMessage interface {
type encodeKafka struct {
kafkaParams api.EncodeKafka
kafkaWriter kafkaWriteMessage
prevRecords []config.GenericMap
}

// Encode writes entries to kafka topic
func (r *encodeKafka) Encode(in []config.GenericMap) []config.GenericMap {
func (r *encodeKafka) Encode(in []config.GenericMap) {
log.Debugf("entering encodeKafka Encode, #items = %d", len(in))
var msgs []kafkago.Message
msgs = make([]kafkago.Message, 0)
out := make([]config.GenericMap, 0)
for _, entry := range in {
var entryByteArray []byte
entryByteArray, _ = json.Marshal(entry)
msg := kafkago.Message{
Value: entryByteArray,
}
msgs = append(msgs, msg)
out = append(out, entry)
}
err := r.kafkaWriter.WriteMessages(context.Background(), msgs...)
if err != nil {
log.Errorf("encodeKafka error: %v", err)
}
return out
r.prevRecords = in
}

// NewEncodeKafka create a new writer to kafka
Expand Down
52 changes: 27 additions & 25 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,16 @@ type metricCacheEntry struct {

type metricCache map[string]*metricCacheEntry

type encodeProm struct {
mu sync.Mutex
port string
prefix string
metrics map[string]metricInfo
expiryTime int64
mList *list.List
mCache metricCache
exitChan <-chan struct{}
type EncodeProm struct {
mu sync.Mutex
port string
prefix string
metrics map[string]metricInfo
expiryTime int64
mList *list.List
mCache metricCache
exitChan <-chan struct{}
PrevRecords []config.GenericMap
}

var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
Expand All @@ -91,8 +92,8 @@ var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
})

// Encode encodes a metric before being stored
func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap {
log.Debugf("entering encodeProm Encode")
func (e *EncodeProm) Encode(metrics []config.GenericMap) {
log.Debugf("entering EncodeProm Encode")
e.mu.Lock()
defer e.mu.Unlock()
out := make([]config.GenericMap, 0)
Expand All @@ -102,13 +103,13 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap {
metricOut := e.EncodeMetric(metric)
out = append(out, metricOut...)
}
e.PrevRecords = out
log.Debugf("out = %v", out)
log.Debugf("cache = %v", e.mCache)
log.Debugf("list = %v", e.mList)
return out
}

func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap {
func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap {
log.Debugf("entering EncodeMetric. metricRecord = %v", metricRecord)
out := make([]config.GenericMap, 0)
for metricName, mInfo := range e.metrics {
Expand Down Expand Up @@ -183,7 +184,7 @@ func generateCacheKey(sig *entrySignature) string {
return eInfoString
}

func (e *encodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]string) *metricCacheEntry {
func (e *EncodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]string) *metricCacheEntry {
// save item in cache; use eInfo as key to the cache
var cEntry *metricCacheEntry
nowInSecs := time.Now().Unix()
Expand All @@ -210,7 +211,7 @@ func (e *encodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]st
return cEntry
}

func (e *encodeProm) cleanupExpiredEntriesLoop() {
func (e *EncodeProm) cleanupExpiredEntriesLoop() {
ticker := time.NewTicker(time.Duration(e.expiryTime) * time.Second)
for {
select {
Expand All @@ -224,7 +225,7 @@ func (e *encodeProm) cleanupExpiredEntriesLoop() {
}

// cleanupExpiredEntries - any entry that has expired should be removed from the prometheus reporting and cache
func (e *encodeProm) cleanupExpiredEntries() {
func (e *EncodeProm) cleanupExpiredEntries() {
log.Debugf("entering cleanupExpiredEntries")
e.mu.Lock()
defer e.mu.Unlock()
Expand Down Expand Up @@ -262,7 +263,7 @@ func (e *encodeProm) cleanupExpiredEntries() {
}

// startPrometheusInterface listens for prometheus resource usage requests
func startPrometheusInterface(w *encodeProm) {
func startPrometheusInterface(w *EncodeProm) {
log.Debugf("entering startPrometheusInterface")
log.Infof("startPrometheusInterface: port num = %s", w.port)

Expand Down Expand Up @@ -337,14 +338,15 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
}

log.Debugf("metrics = %v", metrics)
w := &encodeProm{
port: fmt.Sprintf(":%v", portNum),
prefix: promPrefix,
metrics: metrics,
expiryTime: expiryTime,
mList: list.New(),
mCache: make(metricCache),
exitChan: utils.ExitChannel(),
w := &EncodeProm{
port: fmt.Sprintf(":%v", portNum),
prefix: promPrefix,
metrics: metrics,
expiryTime: expiryTime,
mList: list.New(),
mCache: make(metricCache),
exitChan: utils.ExitChannel(),
PrevRecords: make([]config.GenericMap, 0),
}
go startPrometheusInterface(w)
go w.cleanupExpiredEntriesLoop()
Expand Down
14 changes: 7 additions & 7 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func initNewEncodeProm(t *testing.T) Encoder {

func Test_NewEncodeProm(t *testing.T) {
newEncode := initNewEncodeProm(t)
encodeProm := newEncode.(*encodeProm)
encodeProm := newEncode.(*EncodeProm)
require.Equal(t, ":9103", encodeProm.port)
require.Equal(t, "test_", encodeProm.prefix)
require.Equal(t, 3, len(encodeProm.metrics))
Expand All @@ -95,7 +95,7 @@ func Test_NewEncodeProm(t *testing.T) {
require.Equal(t, cInfo.labelNames, expectedList)
entry := test.GetExtractMockEntry()
input := []config.GenericMap{entry}
output := encodeProm.Encode(input)
encodeProm.Encode(input)

entryLabels1 := make(map[string]string, 3)
entryLabels2 := make(map[string]string, 3)
Expand All @@ -115,8 +115,8 @@ func Test_NewEncodeProm(t *testing.T) {
"Labels": entryLabels2,
"value": 34,
}
require.Contains(t, output, gEntryInfo1)
require.Contains(t, output, gEntryInfo2)
require.Contains(t, encodeProm.PrevRecords, gEntryInfo1)
require.Contains(t, encodeProm.PrevRecords, gEntryInfo2)
gaugeA, err := gInfo.promGauge.GetMetricWith(entryLabels1)
require.Equal(t, nil, err)
bytesA := testutil.ToFloat64(gaugeA)
Expand Down Expand Up @@ -157,7 +157,7 @@ func Test_EncodeAggregate(t *testing.T) {
"count": 1,
}}

newEncode := &encodeProm{
newEncode := &EncodeProm{
port: ":0000",
prefix: "test_",
metrics: map[string]metricInfo{
Expand All @@ -174,7 +174,7 @@ func Test_EncodeAggregate(t *testing.T) {
mCache: make(metricCache),
}

output := newEncode.Encode(metrics)
newEncode.Encode(metrics)

gEntryInfo1 := config.GenericMap{
"Name": "test_gauge",
Expand All @@ -186,5 +186,5 @@ func Test_EncodeAggregate(t *testing.T) {
}

expectedOutput := []config.GenericMap{gEntryInfo1}
require.Equal(t, expectedOutput, output)
require.Equal(t, expectedOutput, newEncode.PrevRecords)
}
9 changes: 4 additions & 5 deletions pkg/pipeline/encode/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ func TestEncodeNone(t *testing.T) {
"varbool": false,
}
map3 := config.GenericMap{}
var out []config.GenericMap
var in []config.GenericMap
out = encodeNone.Encode(in)
require.Equal(t, 0, len(out))
encodeNone.Encode(in)
require.Equal(t, 0, len(encodeNone.prevRecords))
in = append(in, map1)
in = append(in, map2)
in = append(in, map3)
out = encodeNone.Encode(in)
require.Equal(t, len(in), len(out))
encodeNone.Encode(in)
require.Equal(t, len(in), len(encodeNone.prevRecords))
}
8 changes: 5 additions & 3 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func isReceptor(p *pipelineEntry) bool {
}

func isSender(p *pipelineEntry) bool {
return p.stageType != StageWrite
return p.stageType != StageWrite && p.stageType != StageEncode
}

func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, error) {
Expand Down Expand Up @@ -236,11 +236,13 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{},
}
})
case StageEncode:
stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) {
encode := node.AsTerminal(func(in <-chan []config.GenericMap) {
for i := range in {
out <- pe.Encoder.Encode(i)
pe.Encoder.Encode(i)
}
})
b.terminalNodes = append(b.terminalNodes, encode)
stage = encode
case StageTransform:
stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) {
for i := range in {
Expand Down
5 changes: 0 additions & 5 deletions pkg/test/e2e/kafka/flp-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ data:
follows: decode_json
- name: kafka_encode
follows: transform_none
- name: write_none
follows: kafka_encode
parameters:
- name: kafka_ingest
ingest:
Expand All @@ -34,6 +32,3 @@ data:
kafka:
address: my-cluster-kafka-bootstrap.default.svc:9092
topic: test_topic_out
- name: write_none
write:
type: none
5 changes: 0 additions & 5 deletions pkg/test/e2e/pipline/flp-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,6 @@ data:
prefix: flp_
type: prom
name: encode_prom
- name: write_none
write:
type: none
- name: write_loki
write:
loki:
Expand All @@ -299,7 +296,5 @@ data:
name: extract_aggregate
- follows: extract_aggregate
name: encode_prom
- follows: encode_prom
name: write_none
- follows: transform_network
name: write_loki

0 comments on commit fdd8ca2

Please sign in to comment.