Skip to content

Commit

Permalink
conntrack: Configurable timeouts (#357)
Browse files Browse the repository at this point in the history
* Add TBD comments

* WIP

* Add validation checks

* Remove old timeout and interval from config

* Remove TBD comments

* Update comments

* groupId->groupIdx

* Add comment

* Update comment

* Rename ConnTrackSchedulingSelector -> ConnTrackSchedulingGroup

* Fix tests

* Fix bug in tests

* Add test for scheduling groups

* Update README.md

* Make linter happy

* Split metric conntrack_memory_connections into scheduling groups

* Change maps to slices for efficiency

* Join multiple slices into a single slice

* Fix metric conntrack_memory_connections to show the correct length of each group

* Remove entries from hashId2groupIdx map on connection removal

* Update configurations

* Convert selector values to string

Some of the grpc ingester fields are of type uint32.
The yaml/json parser parses numeric values as int32.
So, comparing uint32 to int32 where both static types are
interface{} will always evaluate to false regardless of the actual values.
Hence, we convert and compare strings.

* Add test case for 2 default selectors

* Add special cases conversion and benchmark

* Make linter gofmt happy

* Fix default and string cases and add test cases

* Cleanup

* Fix rebase
  • Loading branch information
ronensc committed Feb 8, 2023
1 parent f7794d4 commit eaac68e
Show file tree
Hide file tree
Showing 19 changed files with 936 additions and 137 deletions.
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,6 @@ parameters:
extract:
type: conntrack
conntrack:
endConnectionTimeout: 30s
updateConnectionInterval: 10s
keyDefinition:
fieldGroups:
- name: src
Expand Down Expand Up @@ -625,6 +623,14 @@ parameters:
- name: TimeFlowEnd
operation: max
input: TimeReceived
scheduling:
- selector: # UDP connections
Proto: 17
endConnectionTimeout: 5s
updateConnectionInterval: 40s
- selector: {} # Default group
endConnectionTimeout: 10s
updateConnectionInterval: 30s
```
A possible output would look like:
Expand Down Expand Up @@ -669,6 +675,11 @@ It is set to true only on the first record of the connection.
The `_IsFirst` fields is useful in cases where `newConnection` records are not outputted (to reduce the number output records)
and there is a need to count the total number of connections: simply counting `_IsFirst=true`
The configuration allows defining scheduling groups. That is, defining different timeouts based on connection key fields' values.
The order of the defined groups is important since the group of a connection is determined by the first matching group.
The last group must have an empty selector indicating a match-all rule serving as a default group for connections that
don't match any of the other groups. There can't be more than one default group.

### Timebased TopK

It is sometimes desirable to return only a subset of records, such as those connections that use the most bandwidth.
Expand Down
4 changes: 3 additions & 1 deletion contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ parameters:
- name: timeEnd
operation: max
input: timeReceived
endConnectionTimeout: 30s
scheduling:
- endConnectionTimeout: 10s
updateConnectionInterval: 30s
- name: transform_network
transform:
type: network
Expand Down
6 changes: 4 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ Following is the supported API format for specifying connection tracking:
max: max
splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows.
input: The input field to base the operation on. When omitted, 'name' is used
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
updateConnectionInterval: duration of time to wait between update reports of a connection
scheduling: list of timeouts and intervals to apply per selector
selector: key-value map to match against connection fields to apply this scheduling
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
updateConnectionInterval: duration of time to wait between update reports of a connection
maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit)
</pre>
## Time-based Filters API
Expand Down
2 changes: 1 addition & 1 deletion docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
|:---|:---|
| **Description** | The total number of tracked connections in memory. |
| **Type** | gauge |
| **Labels** | |
| **Labels** | group |


### conntrack_output_records
Expand Down
5 changes: 4 additions & 1 deletion network_definitions/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ transform:
output: timeReceived
extract:
conntrack:
endConnectionTimeout: 30s
scheduling:
- selector: {}
updateConnectionInterval: 30s
endConnectionTimeout: 10s
outputRecordTypes:
- newConnection
- flowLog
Expand Down
56 changes: 49 additions & 7 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ const (

type ConnTrack struct {
// TODO: should by a pointer instead?
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields,omitempty" doc:"list of output fields"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
UpdateConnectionInterval Duration `yaml:"updateConnectionInterval,omitempty" doc:"duration of time to wait between update reports of a connection"`
MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"`
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields,omitempty" doc:"list of output fields"`
Scheduling []ConnTrackSchedulingGroup `yaml:"scheduling,omitempty" doc:"list of timeouts and intervals to apply per selector"`
MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"`
}

type ConnTrackOutputRecordTypeEnum struct {
Expand Down Expand Up @@ -84,6 +83,12 @@ type ConnTrackOperationEnum struct {
Max string `yaml:"max" doc:"max"`
}

type ConnTrackSchedulingGroup struct {
Selector map[string]interface{} `yaml:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
UpdateConnectionInterval Duration `yaml:"updateConnectionInterval,omitempty" doc:"duration of time to wait between update reports of a connection"`
}

func ConnTrackOperationName(operation string) string {
return GetEnumName(ConnTrackOperationEnum{}, operation)
}
Expand Down Expand Up @@ -162,6 +167,40 @@ func (ct *ConnTrack) Validate() error {
msg: fmt.Errorf("undefined output record type %q", ort)}
}
}

definedKeys := map[string]struct{}{}
for _, fg := range ct.KeyDefinition.FieldGroups {
for _, k := range fg.Fields {
addToSet(definedKeys, k)
}
}
for i, group := range ct.Scheduling {
for k := range group.Selector {
if _, found := definedKeys[k]; !found {
return conntrackInvalidError{undefinedSelectorKey: true,
msg: fmt.Errorf("selector key %q in scheduling group %v is not defined in the keys", k, i)}
}
}
}

numOfDefault := 0
for i, group := range ct.Scheduling {
isDefaultSelector := (len(group.Selector) == 0)
isLastGroup := (i == len(ct.Scheduling)-1)
if isDefaultSelector {
numOfDefault++
}
if isDefaultSelector && !isLastGroup {
return conntrackInvalidError{defaultGroupAndNotLast: true,
msg: fmt.Errorf("scheduling group %v has a default selector but is not the last scheduling group", i)}
}
}

if numOfDefault != 1 {
return conntrackInvalidError{exactlyOneDefaultSelector: true,
msg: fmt.Errorf("found %v default selectors. There should be exactly 1", numOfDefault)}
}

return nil
}

Expand Down Expand Up @@ -211,6 +250,9 @@ type conntrackInvalidError struct {
undefinedFieldGroupBRef bool
undefinedFieldGroupRef bool
unknownOutputRecord bool
undefinedSelectorKey bool
defaultGroupAndNotLast bool
exactlyOneDefaultSelector bool
}

func (err conntrackInvalidError) Error() string {
Expand All @@ -220,7 +262,7 @@ func (err conntrackInvalidError) Error() string {
return ""
}

// Is makes 2 conntrackInvalidError objects equal if all their fields except for `msg` are the equal.
// Is makes 2 conntrackInvalidError objects equal if all their fields except for `msg` are equal.
// This is useful in the tests where we don't want to repeat the error message.
// Is() is invoked by errors.Is() which is invoked by require.ErrorIs().
func (err conntrackInvalidError) Is(target error) bool {
Expand Down
67 changes: 67 additions & 0 deletions pkg/api/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,73 @@ func TestConnTrackValidate(t *testing.T) {
},
conntrackInvalidError{unknownOutputRecord: true},
},
{
"Undefined selector key",
ConnTrack{
KeyDefinition: KeyDefinition{
FieldGroups: []FieldGroup{
{Name: "src", Fields: []string{"srcIP"}},
},
},
Scheduling: []ConnTrackSchedulingGroup{
{
Selector: map[string]interface{}{
"srcIP": "value",
"undefined_key": "0",
},
},
},
},
conntrackInvalidError{undefinedSelectorKey: true},
},
{
"Default selector on a scheduling group that is not the last scheduling group",
ConnTrack{
KeyDefinition: KeyDefinition{
FieldGroups: []FieldGroup{
{Name: "src", Fields: []string{"srcIP"}},
},
},
Scheduling: []ConnTrackSchedulingGroup{
{
Selector: map[string]interface{}{},
},
{
Selector: map[string]interface{}{
"srcIP": "value",
},
},
},
},
conntrackInvalidError{defaultGroupAndNotLast: true},
},
{
"Missing default selector",
ConnTrack{
KeyDefinition: KeyDefinition{
FieldGroups: []FieldGroup{
{Name: "src", Fields: []string{"srcIP"}},
},
},
Scheduling: []ConnTrackSchedulingGroup{},
},
conntrackInvalidError{exactlyOneDefaultSelector: true},
},
{
"2 default selectors",
ConnTrack{
KeyDefinition: KeyDefinition{
FieldGroups: []FieldGroup{
{Name: "src", Fields: []string{"srcIP"}},
},
},
Scheduling: []ConnTrackSchedulingGroup{
{Selector: map[string]interface{}{}},
{Selector: map[string]interface{}{}},
},
},
conntrackInvalidError{defaultGroupAndNotLast: true},
},
}

for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[2])
require.NoError(t, err)
require.JSONEq(t, `{"name":"conntrack","extract":{"type":"conntrack","conntrack":{"KeyDefinition":{"FieldGroups":null,"Hash":{"FieldGroupRefs":null,"FieldGroupARef":"","FieldGroupBRef":""}},"OutputRecordTypes":null,"MaxConnectionsTracked":0,"OutputFields":null,"EndConnectionTimeout":"0s","UpdateConnectionInterval":"0s"}}}`, string(b))
require.JSONEq(t, `{"name":"conntrack","extract":{"type":"conntrack","conntrack":{"KeyDefinition":{"FieldGroups":null,"Hash":{"FieldGroupRefs":null,"FieldGroupARef":"","FieldGroupBRef":""}},"OutputRecordTypes":null,"MaxConnectionsTracked":0,"OutputFields":null,"Scheduling":null}}}`, string(b))

b, err = json.Marshal(params[3])
require.NoError(t, err)
Expand Down
11 changes: 11 additions & 0 deletions pkg/operational/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ func (o *Metrics) NewGauge(def *MetricDefinition, labels ...string) prometheus.G
return c
}

func (o *Metrics) NewGaugeVec(def *MetricDefinition) *prometheus.GaugeVec {
verifyMetricType(def, TypeGauge)
fullName := o.settings.Prefix + def.Name
c := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: fullName,
Help: def.Help,
}, def.Labels)
o.register(c, fullName)
return c
}

func (o *Metrics) NewGaugeFunc(def *MetricDefinition, f func() float64, labels ...string) {
verifyMetricType(def, TypeGauge)
fullName := o.settings.Prefix + def.Name
Expand Down
4 changes: 3 additions & 1 deletion pkg/pipeline/conntrack_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ parameters:
extract:
type: conntrack
conntrack:
endConnectionTimeout: 1s
scheduling:
- selector: {}
endConnectionTimeout: 1s
outputRecordTypes:
- newConnection
- flowLog
Expand Down
61 changes: 56 additions & 5 deletions pkg/pipeline/extract/conntrack/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package conntrack

import (
"fmt"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -40,6 +42,7 @@ type connection interface {
// for this connection (i.e. newConnection, updateConnection, endConnection).
// It returns true on the first invocation to indicate the first report. Otherwise, it returns false.
markReported() bool
isMatchSelector(map[string]interface{}) bool
}

type connType struct {
Expand Down Expand Up @@ -106,6 +109,59 @@ func (c *connType) markReported() bool {
return isFirst
}

func (c *connType) isMatchSelector(selector map[string]interface{}) bool {
for k, v := range selector {
connValueRaw, found := c.keys[k]
if !found {
return false
}
switch connValue := connValueRaw.(type) {
case int:
selectorValue, err := utils.ConvertToInt(v)
if err != nil || connValue != selectorValue {
return false
}
case uint32:
selectorValue, err := utils.ConvertToUint32(v)
if err != nil || connValue != selectorValue {
return false
}
case uint64:
selectorValue, err := utils.ConvertToUint64(v)
if err != nil || connValue != selectorValue {
return false
}
case int64:
selectorValue, err := utils.ConvertToInt64(v)
if err != nil || connValue != selectorValue {
return false
}
case float64:
selectorValue, err := utils.ConvertToFloat64(v)
if err != nil || connValue != selectorValue {
return false
}
case bool:
selectorValue, err := utils.ConvertToBool(v)
if err != nil || connValue != selectorValue {
return false
}
case string:
selectorValue := fmt.Sprintf("%v", v)
if connValue != selectorValue {
return false
}
default:
connValue = fmt.Sprintf("%v", connValue)
selectorValue := fmt.Sprintf("%v", v)
if connValue != selectorValue {
return false
}
}
}
return true
}

type connBuilder struct {
conn *connType
}
Expand Down Expand Up @@ -141,11 +197,6 @@ func (cb *connBuilder) Aggregators(aggs []aggregator) *connBuilder {
return cb
}

func (cb *connBuilder) NextUpdateReportTime(t time.Time) *connBuilder {
cb.conn.setNextUpdateReportTime(t)
return cb
}

func (cb *connBuilder) Build() connection {
return cb.conn
}
Loading

0 comments on commit eaac68e

Please sign in to comment.