Skip to content

Commit

Permalink
limit memory used by FLP (#376)
Browse files Browse the repository at this point in the history
* add maxMetrics to encode_prom

* added test for maxMetrics

* addressed reviewer comments

* limited number of connections tracked in conntrack

* added MaxConnectionsTracked to test result

* added to metric of discarded flow logs

* addressed review comments
  • Loading branch information
KalmanMeth committed Feb 8, 2023
1 parent 5b70cdc commit b6979fe
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 34 deletions.
15 changes: 14 additions & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Following is the supported API format for prometheus encode:
tls: TLS configuration for the prometheus endpoint
certPath: path to the certificate file
keyPath: path to the key file
maxMetrics: maximum number of metrics to report (default: unlimited)
</pre>
## Kafka encode API
Following is the supported API format for kafka encode:
Expand Down Expand Up @@ -149,11 +150,22 @@ Following is the supported API format for network transformations:
add_location: add output location fields from input
add_service: add output network service field from input port and parameters protocol field
add_kubernetes: add output kubernetes fields from input
reinterpret_direction: reinterpret flow direction at a higher level than the interface
add_ip_category: categorize IPs based on known subnets configuration
parameters: parameters specific to type
assignee: value needs to assign to output field
kubeConfigPath: path to kubeconfig file (optional)
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
ipCategories: configure IP categories
cidrs: list of CIDRs to match a category
name: name of the category
directionInfo: information to reinterpret flow direction (optional, to use with reinterpret_direction rule)
reporterIPField: field providing the reporter (agent) host IP
srcHostField: source host field
dstHostField: destination host field
flowDirectionField: field providing the flow direction in the input entries; it will be rewritten
ifDirectionField: interface-level field for flow direction, to create in output
</pre>
## Write Loki API
Following is the supported API format for writing to loki:
Expand Down Expand Up @@ -189,7 +201,7 @@ Following is the supported API format for specifying metrics aggregations:
aggregates:
name: description of aggregation result
groupByKeys: list of fields on which to aggregate
operationType: sum, min, max, avg or raw_values
operationType: sum, min, max, count, avg or raw_values
operationKey: internal field on which to perform the operation
</pre>
## Connection tracking API
Expand Down Expand Up @@ -221,6 +233,7 @@ Following is the supported API format for specifying connection tracking:
input: The input field to base the operation on. When omitted, 'name' is used
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
updateConnectionInterval: duration of time to wait between update reports of a connection
maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit)
</pre>
## Time-based Filters API
Following is the supported API format for specifying metrics time-based filters:
Expand Down
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | stage |


### metrics_dropped
| **Name** | metrics_dropped |
|:---|:---|
| **Description** | Number of metrics dropped |
| **Type** | counter |
| **Labels** | stage |


### metrics_processed
| **Name** | metrics_processed |
|:---|:---|
Expand Down
1 change: 1 addition & 0 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ConnTrack struct {
OutputFields []OutputField `yaml:"outputFields,omitempty" doc:"list of output fields"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
UpdateConnectionInterval Duration `yaml:"updateConnectionInterval,omitempty" doc:"duration of time to wait between update reports of a connection"`
MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"`
}

type ConnTrackOutputRecordTypeEnum struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PromEncode struct {
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"`
ExpiryTime int `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"`
TLS *PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the prometheus endpoint"`
MaxMetrics int `yaml:"maxMetrics,omitempty" json:"maxMetrics,omitempty" doc:"maximum number of metrics to report (default: unlimited)"`
}

type PromEncodeOperationEnum struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/extract_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ type AggregateOperation string
type AggregateDefinition struct {
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"description of aggregation result"`
GroupByKeys AggregateBy `yaml:"groupByKeys,omitempty" json:"groupByKeys,omitempty" doc:"list of fields on which to aggregate"`
OperationType AggregateOperation `yaml:"operationType,omitempty" json:"operationType,omitempty" doc:"sum, min, max, avg or raw_values"`
OperationType AggregateOperation `yaml:"operationType,omitempty" json:"operationType,omitempty" doc:"sum, min, max, count, avg or raw_values"`
OperationKey string `yaml:"operationKey,omitempty" json:"operationKey,omitempty" doc:"internal field on which to perform the operation"`
}
2 changes: 1 addition & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[2])
require.NoError(t, err)
require.JSONEq(t, `{"name":"conntrack","extract":{"type":"conntrack","conntrack":{"KeyDefinition":{"FieldGroups":null,"Hash":{"FieldGroupRefs":null,"FieldGroupARef":"","FieldGroupBRef":""}},"OutputRecordTypes":null,"OutputFields":null,"EndConnectionTimeout":"0s","UpdateConnectionInterval":"0s"}}}`, string(b))
require.JSONEq(t, `{"name":"conntrack","extract":{"type":"conntrack","conntrack":{"KeyDefinition":{"FieldGroups":null,"Hash":{"FieldGroupRefs":null,"FieldGroupARef":"","FieldGroupBRef":""}},"OutputRecordTypes":null,"MaxConnectionsTracked":0,"OutputFields":null,"EndConnectionTimeout":"0s","UpdateConnectionInterval":"0s"}}}`, string(b))

b, err = json.Marshal(params[3])
require.NoError(t, err)
Expand Down
22 changes: 19 additions & 3 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type EncodeProm struct {
server *http.Server
tlsConfig *api.PromTLSConf
metricsProcessed prometheus.Counter
metricsDropped prometheus.Counter
errorsCounter *prometheus.CounterVec
}

Expand All @@ -73,6 +74,12 @@ var (
operational.TypeCounter,
"stage",
)
metricsDropped = operational.DefineMetric(
"metrics_dropped",
"Number of metrics dropped",
operational.TypeCounter,
"stage",
)
encodePromErrors = operational.DefineMetric(
"encode_prom_errors",
"Total errors during metrics generation",
Expand Down Expand Up @@ -165,7 +172,11 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetrics

entryLabels, key := e.extractLabelsAndKey(flow, info)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
_, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
e.metricsDropped.Inc()
return nil, 0
}
return entryLabels, floatVal
}

Expand All @@ -182,7 +193,11 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri

entryLabels, key := e.extractLabelsAndKey(flow, info)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
_, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
e.metricsDropped.Inc()
return nil, nil
}
return entryLabels, values
}

Expand Down Expand Up @@ -366,9 +381,10 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
histos: histos,
aggHistos: aggHistos,
expiryTime: expiryTime,
mCache: utils.NewTimedCache(),
mCache: utils.NewTimedCache(cfg.MaxMetrics),
exitChan: utils.ExitChannel(),
metricsProcessed: opMetrics.NewCounter(&metricsProcessed, params.Name),
metricsDropped: opMetrics.NewCounter(&metricsDropped, params.Name),
errorsCounter: opMetrics.NewCounterVec(&encodePromErrors),
}
go w.startServer()
Expand Down
167 changes: 167 additions & 0 deletions pkg/pipeline/encode/prom_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package encode

import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

var (
yamlConfig1 = `
pipeline:
- name: encode
parameters:
- name: encode_prom
encode:
type: prom
prom:
port: 9103
prefix: test_
expiryTime: 1
maxMetrics: 30
metrics:
- name: bytes_count
type: counter
valueKey: Bytes
labels:
- SrcAddr
`

yamlConfig2 = `
pipeline:
- name: encode
parameters:
- name: encode_prom
encode:
type: prom
prom:
port: 9103
prefix: test_
expiryTime: 1
maxMetrics: 30
metrics:
- name: bytes_count
type: counter
valueKey: Bytes
labels:
- SrcAddr
- name: packets_count
type: counter
valueKey: Packets
labels:
- SrcAddr
`

yamlConfig3 = `
pipeline:
- name: encode
parameters:
- name: encode_prom
encode:
type: prom
prom:
port: 9103
prefix: test_
expiryTime: 1
metrics:
- name: bytes_count
type: counter
valueKey: Bytes
labels:
- SrcAddr
- name: packets_count
type: counter
valueKey: Packets
labels:
- SrcAddr
`
)

func encodeEntries(promEncode *EncodeProm, entries []config.GenericMap) {
for _, entry := range entries {
promEncode.Encode(entry)
}
}

// Test_Prom_Cache tests the integration between encode_prom and timebased_cache.
// Set a cache size, create many prom metrics, and verify that they interact properly.
func Test_Prom_Cache1(t *testing.T) {
var entries []config.GenericMap

v, cfg := test.InitConfig(t, yamlConfig1)
require.NotNil(t, v)

promEncode, cleanup, err := initPromWithServer(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)
defer cleanup()

entries = test.GenerateConnectionEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 10, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 30, promEncode.mCache.GetCacheLen())
}

func Test_Prom_Cache2(t *testing.T) {
var entries []config.GenericMap

v, cfg := test.InitConfig(t, yamlConfig2)
require.NotNil(t, v)

promEncode, cleanup, err := initPromWithServer(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)
defer cleanup()

entries = test.GenerateConnectionEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 20, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 30, promEncode.mCache.GetCacheLen())
}

func Test_Prom_Cache3(t *testing.T) {
var entries []config.GenericMap

v, cfg := test.InitConfig(t, yamlConfig3)
require.NotNil(t, v)

promEncode, cleanup, err := initPromWithServer(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)
defer cleanup()

entries = test.GenerateConnectionEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 20, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 80, promEncode.mCache.GetCacheLen())
}
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func GetMockAggregate() Aggregate {
OperationType: "avg",
OperationKey: "value",
},
cache: utils.NewTimedCache(),
cache: utils.NewTimedCache(0),
mutex: &sync.Mutex{},
expiryTime: 30 * time.Second,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (aggregates *Aggregates) GetMetrics() []config.GenericMap {
func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) []Aggregate {
aggregate := Aggregate{
Definition: aggregateDefinition,
cache: utils.NewTimedCache(),
cache: utils.NewTimedCache(0),
mutex: &sync.Mutex{},
expiryTime: aggregates.expiryTime,
}
Expand Down
41 changes: 23 additions & 18 deletions pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,29 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
}
conn, exists := ct.connStore.getConnection(computedHash.hashTotal)
if !exists {
builder := NewConnBuilder()
conn = builder.
Hash(computedHash).
KeysFrom(fl, ct.config.KeyDefinition).
Aggregators(ct.aggregators).
NextUpdateReportTime(ct.clock.Now().Add(ct.config.UpdateConnectionInterval.Duration)).
Build()
ct.connStore.addConnection(computedHash.hashTotal, conn)
ct.updateConnection(conn, fl, computedHash)
ct.metrics.inputRecords.WithLabelValues("newConnection").Inc()
if ct.shouldOutputNewConnection {
record := conn.toGenericMap()
addHashField(record, computedHash.hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection"))
isFirst := conn.markReported()
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
ct.metrics.outputRecords.WithLabelValues("newConnection").Inc()
if (ct.config.MaxConnectionsTracked > 0) && (ct.config.MaxConnectionsTracked <= ct.connStore.mom.Len()) {
log.Warningf("too many connections; skipping flow log %v: ", fl)
ct.metrics.inputRecords.WithLabelValues("discarded").Inc()
} else {
builder := NewConnBuilder()
conn = builder.
Hash(computedHash).
KeysFrom(fl, ct.config.KeyDefinition).
Aggregators(ct.aggregators).
NextUpdateReportTime(ct.clock.Now().Add(ct.config.UpdateConnectionInterval.Duration)).
Build()
ct.connStore.addConnection(computedHash.hashTotal, conn)
ct.updateConnection(conn, fl, computedHash)
ct.metrics.inputRecords.WithLabelValues("newConnection").Inc()
if ct.shouldOutputNewConnection {
record := conn.toGenericMap()
addHashField(record, computedHash.hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection"))
isFirst := conn.markReported()
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
ct.metrics.outputRecords.WithLabelValues("newConnection").Inc()
}
}
} else {
ct.updateConnection(conn, fl, computedHash)
Expand Down
Loading

0 comments on commit b6979fe

Please sign in to comment.