diff --git a/README.md b/README.md index 37ff4f5ff..ed5a7c3ec 100644 --- a/README.md +++ b/README.md @@ -631,9 +631,11 @@ parameters: Proto: 17 endConnectionTimeout: 5s heartbeatInterval: 40s + terminatingTimeout: 5s - selector: {} # Default group endConnectionTimeout: 10s heartbeatInterval: 30s + terminatingTimeout: 5s tcpFlags: fieldName: Flags detectEndConnection: true diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index bb7636589..2edaf8ea4 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -107,6 +107,7 @@ parameters: scheduling: - endConnectionTimeout: 10s heartbeatInterval: 30s + terminatingTimeout: 5s tcpFlags: fieldName: TCPFlags detectEndConnection: true diff --git a/docs/api.md b/docs/api.md index cad853881..22e66baf5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -225,16 +225,19 @@ Following is the supported API format for specifying connection tracking: count: count min: min max: max + first: first + last: last splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows. input: The input field to base the operation on. When omitted, 'name' is used scheduling: list of timeouts and intervals to apply per selector selector: key-value map to match against connection fields to apply this scheduling endConnectionTimeout: duration of time to wait from the last flow log to end a connection + terminatingTimeout: duration of time to wait from detected FIN flag to end a connection heartbeatInterval: duration of time to wait between heartbeat reports of a connection maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit) tcpFlags: settings for handling TCP flags fieldName: name of the field containing TCP flags - detectEndConnection: detect end connections by FIN_ACK flag + detectEndConnection: detect end connections by FIN flag swapAB: swap source and destination when the first flowlog contains the SYN_ACK flag ## Time-based Filters API diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index fdc36c8de..b50b8ac7c 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -18,9 +18,9 @@ Each table below provides documentation for an exported flowlogs-pipeline operat ### conntrack_memory_connections | **Name** | conntrack_memory_connections | |:---|:---| -| **Description** | The total number of tracked connections in memory. | +| **Description** | The total number of tracked connections in memory per group and phase. | | **Type** | gauge | -| **Labels** | group | +| **Labels** | group, phase | ### conntrack_output_records diff --git a/network_definitions/config.yaml b/network_definitions/config.yaml index bb063ad9e..f2d19c715 100644 --- a/network_definitions/config.yaml +++ b/network_definitions/config.yaml @@ -41,6 +41,7 @@ extract: - selector: {} heartbeatInterval: 30s endConnectionTimeout: 10s + terminatingTimeout: 5s outputRecordTypes: - newConnection - flowLog diff --git a/pkg/api/conntrack.go b/pkg/api/conntrack.go index a0ebfb131..01bf30779 100644 --- a/pkg/api/conntrack.go +++ b/pkg/api/conntrack.go @@ -81,11 +81,14 @@ type ConnTrackOperationEnum struct { Count string `yaml:"count" json:"count" doc:"count"` Min string `yaml:"min" json:"min" doc:"min"` Max string `yaml:"max" json:"max" doc:"max"` + First string `yaml:"first" json:"first" doc:"first"` + Last string `yaml:"last" json:"last" doc:"last"` } type ConnTrackSchedulingGroup struct { Selector map[string]interface{} `yaml:"selector,omitempty" json:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"` EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" json:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"` + TerminatingTimeout Duration `yaml:"terminatingTimeout,omitempty" json:"terminatingTimeout,omitempty" doc:"duration of time to wait from detected FIN flag to end a connection"` HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" json:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"` } @@ -95,7 +98,7 @@ func ConnTrackOperationName(operation string) string { type ConnTrackTCPFlags struct { FieldName string `yaml:"fieldName,omitempty" json:"fieldName,omitempty" doc:"name of the field containing TCP flags"` - DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN_ACK flag"` + DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN flag"` SwapAB bool `yaml:"swapAB,omitempty" json:"swapAB,omitempty" doc:"swap source and destination when the first flowlog contains the SYN_ACK flag"` } @@ -113,7 +116,7 @@ func (ct *ConnTrack) Validate() error { return conntrackInvalidError{splitABWithNoBidi: true, msg: fmt.Errorf("output field %q has splitAB=true although bidirection is not enabled (fieldGroupARef is empty)", of.Name)} } - if !isOperationValid(of.Operation) { + if !isOperationValid(of.Operation, of.SplitAB) { return conntrackInvalidError{unknownOperation: true, msg: fmt.Errorf("unknown operation %q in output field %q", of.Operation, of.Name)} } @@ -250,13 +253,15 @@ func addToSet(set map[string]struct{}, item string) bool { return true } -func isOperationValid(value string) bool { +func isOperationValid(value string, splitAB bool) bool { valid := true switch value { case ConnTrackOperationName("Sum"): case ConnTrackOperationName("Count"): case ConnTrackOperationName("Min"): case ConnTrackOperationName("Max"): + case ConnTrackOperationName("First"), ConnTrackOperationName("Last"): + valid = !splitAB default: valid = false } diff --git a/pkg/pipeline/conntrack_integ_test.go b/pkg/pipeline/conntrack_integ_test.go index 32271e5fa..ae1fea284 100644 --- a/pkg/pipeline/conntrack_integ_test.go +++ b/pkg/pipeline/conntrack_integ_test.go @@ -51,6 +51,8 @@ parameters: scheduling: - selector: {} endConnectionTimeout: 1s + heartbeatInterval: 10s + terminatingTimeout: 5s outputRecordTypes: - newConnection - flowLog @@ -120,7 +122,7 @@ func TestConnTrack(t *testing.T) { }, test2.Interval(10*time.Millisecond)) // Wait a moment to make the connections expired - time.Sleep(2 * time.Second) + time.Sleep(5 * time.Second) // Send something to the pipeline to allow the connection tracking output end connection records in <- config.GenericMap{"DstAddr": "1.2.3.4"} diff --git a/pkg/pipeline/extract/conntrack/aggregator.go b/pkg/pipeline/extract/conntrack/aggregator.go index cdd63da03..e6b1abab4 100644 --- a/pkg/pipeline/extract/conntrack/aggregator.go +++ b/pkg/pipeline/extract/conntrack/aggregator.go @@ -33,20 +33,22 @@ type aggregator interface { // addField adds an aggregate field to the connection addField(conn connection) // update updates the aggregate field in the connection based on the flow log. - update(conn connection, flowLog config.GenericMap, d direction) + update(conn connection, flowLog config.GenericMap, d direction, isFirst bool) } type aggregateBase struct { inputField string outputField string splitAB bool - initVal float64 + initVal interface{} } type aSum struct{ aggregateBase } type aCount struct{ aggregateBase } type aMin struct{ aggregateBase } type aMax struct{ aggregateBase } +type aFirst struct{ aggregateBase } +type aLast struct{ aggregateBase } // TODO: think of adding a more complex operation such as Average Packet Size which involves 2 input fields: Bytes/Packets @@ -65,10 +67,10 @@ func newAggregator(of api.OutputField) (aggregator, error) { var agg aggregator switch of.Operation { case api.ConnTrackOperationName("Sum"): - aggBase.initVal = 0 + aggBase.initVal = float64(0) agg = &aSum{aggBase} case api.ConnTrackOperationName("Count"): - aggBase.initVal = 0 + aggBase.initVal = float64(0) agg = &aCount{aggBase} case api.ConnTrackOperationName("Min"): aggBase.initVal = math.MaxFloat64 @@ -76,6 +78,12 @@ func newAggregator(of api.OutputField) (aggregator, error) { case api.ConnTrackOperationName("Max"): aggBase.initVal = -math.MaxFloat64 agg = &aMax{aggBase} + case api.ConnTrackOperationName("First"): + aggBase.initVal = nil + agg = &aFirst{aggBase} + case api.ConnTrackOperationName("Last"): + aggBase.initVal = nil + agg = &aLast{aggBase} default: return nil, fmt.Errorf("unknown operation: %q", of.Operation) } @@ -118,26 +126,26 @@ func (agg *aggregateBase) addField(conn connection) { } } -func (agg *aSum) update(conn connection, flowLog config.GenericMap, d direction) { +func (agg *aSum) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) { outputField := agg.getOutputField(d) v, err := agg.getInputFieldValue(flowLog) if err != nil { log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err) return } - conn.updateAggValue(outputField, func(curr float64) float64 { + conn.updateAggFnValue(outputField, func(curr float64) float64 { return curr + v }) } -func (agg *aCount) update(conn connection, flowLog config.GenericMap, d direction) { +func (agg *aCount) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) { outputField := agg.getOutputField(d) - conn.updateAggValue(outputField, func(curr float64) float64 { + conn.updateAggFnValue(outputField, func(curr float64) float64 { return curr + 1 }) } -func (agg *aMin) update(conn connection, flowLog config.GenericMap, d direction) { +func (agg *aMin) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) { outputField := agg.getOutputField(d) v, err := agg.getInputFieldValue(flowLog) if err != nil { @@ -145,12 +153,12 @@ func (agg *aMin) update(conn connection, flowLog config.GenericMap, d direction) return } - conn.updateAggValue(outputField, func(curr float64) float64 { + conn.updateAggFnValue(outputField, func(curr float64) float64 { return math.Min(curr, v) }) } -func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction) { +func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) { outputField := agg.getOutputField(d) v, err := agg.getInputFieldValue(flowLog) if err != nil { @@ -158,7 +166,17 @@ func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction) return } - conn.updateAggValue(outputField, func(curr float64) float64 { + conn.updateAggFnValue(outputField, func(curr float64) float64 { return math.Max(curr, v) }) } + +func (cp *aFirst) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) { + if isNew { + conn.updateAggValue(cp.outputField, flowLog[cp.inputField]) + } +} + +func (cp *aLast) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) { + conn.updateAggValue(cp.outputField, flowLog[cp.inputField]) +} diff --git a/pkg/pipeline/extract/conntrack/aggregator_test.go b/pkg/pipeline/extract/conntrack/aggregator_test.go index b52340e64..2f5c84c18 100644 --- a/pkg/pipeline/extract/conntrack/aggregator_test.go +++ b/pkg/pipeline/extract/conntrack/aggregator_test.go @@ -45,6 +45,14 @@ func TestNewAggregator_Invalid(t *testing.T) { Input: "Input", }) require.NotNil(t, err) + + // invalid first agg + _, err = newAggregator(api.OutputField{ + Operation: "first", + SplitAB: true, + Input: "Input", + }) + require.NotNil(t, err) } func TestNewAggregator_Valid(t *testing.T) { @@ -56,27 +64,27 @@ func TestNewAggregator_Valid(t *testing.T) { { name: "Default SplitAB", outputField: api.OutputField{Name: "MyAgg", Operation: "sum"}, - expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, 0}}, + expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}}, }, { name: "Default input", outputField: api.OutputField{Name: "MyAgg", Operation: "sum", SplitAB: true}, - expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, 0}}, + expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0)}}, }, { name: "Custom input", outputField: api.OutputField{Name: "MyAgg", Operation: "sum", Input: "MyInput"}, - expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, 0}}, + expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0)}}, }, { name: "OperationType sum", outputField: api.OutputField{Name: "MyAgg", Operation: "sum"}, - expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, 0}}, + expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}}, }, { name: "OperationType count", outputField: api.OutputField{Name: "MyAgg", Operation: "count"}, - expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, 0}}, + expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}}, }, { name: "OperationType max", @@ -88,6 +96,21 @@ func TestNewAggregator_Valid(t *testing.T) { outputField: api.OutputField{Name: "MyAgg", Operation: "min"}, expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64}}, }, + { + name: "Default first", + outputField: api.OutputField{Name: "MyCp", Operation: "first"}, + expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil}}, + }, + { + name: "Custom input first", + outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"}, + expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil}}, + }, + { + name: "Default last", + outputField: api.OutputField{Name: "MyCp", Operation: "last"}, + expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil}}, + }, } for _, test := range table { @@ -106,6 +129,8 @@ func TestAddField_and_Update(t *testing.T) { {Name: "numFlowLogs", Operation: "count"}, {Name: "minFlowLogBytes", Operation: "min", Input: "Bytes"}, {Name: "maxFlowLogBytes", Operation: "max", Input: "Bytes"}, + {Name: "FirstFlowDirection", Operation: "first", Input: "FlowDirection"}, + {Name: "LastFlowDirection", Operation: "last", Input: "FlowDirection"}, } var aggs []aggregator for _, of := range ofs { @@ -119,24 +144,26 @@ func TestAddField_and_Update(t *testing.T) { portA := 1 portB := 9002 protocolA := 6 + flowDirA := 0 + flowDirB := 1 table := []struct { name string flowLog config.GenericMap direction direction - expected map[string]float64 + expected map[string]interface{} }{ { name: "flowLog 1", - flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, 100, 10, false), + flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirA, 100, 10, false), direction: dirAB, - expected: map[string]float64{"Bytes_AB": 100, "Bytes_BA": 0, "Packets": 10, "maxFlowLogBytes": 100, "minFlowLogBytes": 100, "numFlowLogs": 1}, + expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(0), "Packets": float64(10), "maxFlowLogBytes": float64(100), "minFlowLogBytes": float64(100), "numFlowLogs": float64(1), "FirstFlowDirection": 0, "LastFlowDirection": 0}, }, { name: "flowLog 2", - flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, 200, 20, false), + flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 200, 20, false), direction: dirBA, - expected: map[string]float64{"Bytes_AB": 100, "Bytes_BA": 200, "Packets": 30, "maxFlowLogBytes": 200, "minFlowLogBytes": 100, "numFlowLogs": 2}, + expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(200), "Packets": float64(30), "maxFlowLogBytes": float64(200), "minFlowLogBytes": float64(100), "numFlowLogs": float64(2), "FirstFlowDirection": 0, "LastFlowDirection": 1}, }, } @@ -144,13 +171,13 @@ func TestAddField_and_Update(t *testing.T) { for _, agg := range aggs { agg.addField(conn) } - expectedInits := map[string]float64{"Bytes_AB": 0, "Bytes_BA": 0, "Packets": 0, "maxFlowLogBytes": -math.MaxFloat64, "minFlowLogBytes": math.MaxFloat64, "numFlowLogs": 0} + expectedInits := map[string]interface{}{"Bytes_AB": float64(0), "Bytes_BA": float64(0), "Packets": float64(0), "maxFlowLogBytes": float64(-math.MaxFloat64), "minFlowLogBytes": float64(math.MaxFloat64), "numFlowLogs": float64(0), "FirstFlowDirection": nil, "LastFlowDirection": nil} require.Equal(t, expectedInits, conn.(*connType).aggFields) - for _, test := range table { + for i, test := range table { t.Run(test.name, func(t *testing.T) { for _, agg := range aggs { - agg.update(conn, test.flowLog, test.direction) + agg.update(conn, test.flowLog, test.direction, i == 0) } require.Equal(t, test.expected, conn.(*connType).aggFields) }) diff --git a/pkg/pipeline/extract/conntrack/conn.go b/pkg/pipeline/extract/conntrack/conn.go index e5afc3576..8cf00ff87 100644 --- a/pkg/pipeline/extract/conntrack/conn.go +++ b/pkg/pipeline/extract/conntrack/conn.go @@ -29,9 +29,9 @@ import ( ) type connection interface { - addAgg(fieldName string, initValue float64) - getAggValue(fieldName string) (float64, bool) - updateAggValue(fieldName string, newValueFn func(curr float64) float64) + addAgg(fieldName string, initValue interface{}) + updateAggValue(fieldName string, newValue interface{}) + updateAggFnValue(fieldName string, newValueFn func(curr float64) float64) setExpiryTime(t time.Time) getExpiryTime() time.Time setNextHeartbeatTime(t time.Time) @@ -48,27 +48,37 @@ type connection interface { type connType struct { hash totalHashType keys config.GenericMap - aggFields map[string]float64 + aggFields map[string]interface{} expiryTime time.Time nextHeartbeatTime time.Time isReported bool } -func (c *connType) addAgg(fieldName string, initValue float64) { +func (c *connType) addAgg(fieldName string, initValue interface{}) { c.aggFields[fieldName] = initValue } -func (c *connType) getAggValue(fieldName string) (float64, bool) { - v, ok := c.aggFields[fieldName] - return v, ok +func (c *connType) updateAggValue(fieldName string, newValue interface{}) { + _, ok := c.aggFields[fieldName] + if !ok { + log.Panicf("tried updating missing field %v", fieldName) + } + c.aggFields[fieldName] = newValue } -func (c *connType) updateAggValue(fieldName string, newValueFn func(curr float64) float64) { +func (c *connType) updateAggFnValue(fieldName string, newValueFn func(curr float64) float64) { v, ok := c.aggFields[fieldName] if !ok { log.Panicf("tried updating missing field %v", fieldName) } - c.aggFields[fieldName] = newValueFn(v) + + // existing value must be float64 for function aggregation + switch value := v.(type) { + case float64: + c.aggFields[fieldName] = newValueFn(value) + default: + log.Panicf("tried to aggregate non float64 field %v value %v", fieldName, v) + } } func (c *connType) setExpiryTime(t time.Time) { @@ -92,7 +102,8 @@ func (c *connType) toGenericMap() config.GenericMap { for k, v := range c.aggFields { gm[k] = v } - // In case of a conflict between the keys and the aggFields, the keys should prevail. + + // In case of a conflict between the keys and the aggFields / cpFields, the keys should prevail. for k, v := range c.keys { gm[k] = v } @@ -171,7 +182,7 @@ type connBuilder struct { func NewConnBuilder(metrics *metricsType) *connBuilder { return &connBuilder{ conn: &connType{ - aggFields: make(map[string]float64), + aggFields: make(map[string]interface{}), keys: config.GenericMap{}, isReported: false, }, diff --git a/pkg/pipeline/extract/conntrack/conntrack.go b/pkg/pipeline/extract/conntrack/conntrack.go index d9dcfd61f..43565e5b8 100644 --- a/pkg/pipeline/extract/conntrack/conntrack.go +++ b/pkg/pipeline/extract/conntrack/conntrack.go @@ -72,7 +72,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM log.Debugf("skipping duplicated flow log %v", fl) ct.metrics.inputRecords.WithLabelValues("duplicate").Inc() } else { - conn, exists := ct.connStore.getConnection(computedHash.hashTotal) + conn, exists, _ := ct.connStore.getConnection(computedHash.hashTotal) if !exists { if (ct.config.MaxConnectionsTracked > 0) && (ct.connStore.len() >= ct.config.MaxConnectionsTracked) { log.Warningf("too many connections; skipping flow log %v: ", fl) @@ -80,14 +80,15 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM } else { builder := NewConnBuilder(ct.metrics) conn = builder. - ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.shouldSwapAB(fl)). + ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.containsTcpFlag(fl, SYN_ACK_FLAG)). + Hash(computedHash). KeysFrom(fl, ct.config.KeyDefinition, ct.endpointAFields, ct.endpointBFields). Aggregators(ct.aggregators). Hash(computedHash). Build() ct.connStore.addConnection(computedHash.hashTotal, conn) ct.connStore.updateNextHeartbeatTime(computedHash.hashTotal) - ct.updateConnection(conn, fl, computedHash) + ct.updateConnection(conn, fl, computedHash, true) ct.metrics.inputRecords.WithLabelValues("newConnection").Inc() if ct.shouldOutputNewConnection { record := conn.toGenericMap() @@ -100,7 +101,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM } } } else { - ct.updateConnection(conn, fl, computedHash) + ct.updateConnection(conn, fl, computedHash, false) ct.metrics.inputRecords.WithLabelValues("update").Inc() } } @@ -167,28 +168,20 @@ func (ct *conntrackImpl) prepareHeartbeatRecords() []config.GenericMap { return outputRecords } -func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.GenericMap, flowLogHash totalHashType) { +func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.GenericMap, flowLogHash totalHashType, isNew bool) { d := ct.getFlowLogDirection(conn, flowLogHash) for _, agg := range ct.aggregators { - agg.update(conn, flowLog, d) + agg.update(conn, flowLog, d, isNew) } - if ct.config.TCPFlags.DetectEndConnection && ct.isLastFlowLogOfConnection(flowLog) { + if ct.config.TCPFlags.DetectEndConnection && ct.containsTcpFlag(flowLog, FIN_FLAG) { ct.metrics.tcpFlags.WithLabelValues("detectEndConnection").Inc() - ct.connStore.expireConnection(flowLogHash.hashTotal) + ct.connStore.setConnectionTerminating(flowLogHash.hashTotal) } else { ct.connStore.updateConnectionExpiryTime(flowLogHash.hashTotal) } } -func (ct *conntrackImpl) isLastFlowLogOfConnection(flowLog config.GenericMap) bool { - return ct.containsTcpFlag(flowLog, FIN_ACK_FLAG) -} - -func (ct *conntrackImpl) shouldSwapAB(flowLog config.GenericMap) bool { - return ct.containsTcpFlag(flowLog, SYN_ACK_FLAG) -} - func (ct *conntrackImpl) containsTcpFlag(flowLog config.GenericMap, queryFlag uint32) bool { tcpFlagsRaw, ok := flowLog[ct.config.TCPFlags.FieldName] if ok { diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index 49d118916..f3f211437 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -37,7 +37,7 @@ import ( var opMetrics = operational.NewMetrics(&config.MetricsSettings{}) func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string, - heartbeatInterval, endConnectionTimeout time.Duration) *config.StageParam { + heartbeatInterval, endConnectionTimeout time.Duration, terminatingTimeout time.Duration) *config.StageParam { splitAB := isBidirectional var hash api.ConnTrackHash if isBidirectional { @@ -91,6 +91,7 @@ func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string, Selector: map[string]interface{}{}, HeartbeatInterval: api.Duration{Duration: heartbeatInterval}, EndConnectionTimeout: api.Duration{Duration: endConnectionTimeout}, + TerminatingTimeout: api.Duration{Duration: terminatingTimeout}, }, }, }, // end of api.ConnTrack @@ -101,29 +102,41 @@ func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string, func TestTrack(t *testing.T) { heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second + terminatingTimeout := 5 * time.Second + ipA := "10.0.0.1" ipB := "10.0.0.2" portA := 9001 portB := 9002 protocol := 6 + flowDir := 0 hashId := "705baa5149302fa1" hashIdAB := "705baa5149302fa1" hashIdBA := "cc40f571f40f3111" - flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11, false) - flAB1Duplicated := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11, true) - flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 222, 22, false) - flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, 333, 33, false) - flBA4 := newMockFlowLog(ipB, portB, ipA, portA, protocol, 444, 44, false) + flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, false) + // duplicates should be ignored + flAB1Duplicated := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, true) + flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 222, 22, false) + flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 333, 33, false) + flBA4 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 444, 44, false) table := []struct { name string conf *config.StageParam inputFlowLogs []config.GenericMap expected []config.GenericMap }{ + { + "duplicates, doesn't output connection events", + buildMockConnTrackConfig(true, []string{"newConnection", "heartbeat", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), + []config.GenericMap{flAB1Duplicated}, + []config.GenericMap(nil), + }, { "bidirectional, output new connection", - buildMockConnTrackConfig(true, []string{"newConnection"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(true, []string{"newConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(), @@ -131,7 +144,8 @@ func TestTrack(t *testing.T) { }, { "bidirectional, output new connection and flow log", - buildMockConnTrackConfig(true, []string{"newConnection", "flowLog"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(true, []string{"newConnection", "flowLog"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(), @@ -144,7 +158,8 @@ func TestTrack(t *testing.T) { }, { "unidirectional, output new connection", - buildMockConnTrackConfig(false, []string{"newConnection"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(false, []string{"newConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(), @@ -153,7 +168,8 @@ func TestTrack(t *testing.T) { }, { "unidirectional, output new connection and flow log", - buildMockConnTrackConfig(false, []string{"newConnection", "flowLog"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(false, []string{"newConnection", "flowLog"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(), @@ -188,7 +204,10 @@ func TestEndConn_Bidirectional(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -197,12 +216,13 @@ func TestEndConn_Bidirectional(t *testing.T) { portA := 9001 portB := 9002 protocol := 6 + flowDir := 0 hashId := "705baa5149302fa1" - flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11, false) - flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 222, 22, false) - flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, 333, 33, false) - flBA4 := newMockFlowLog(ipB, portB, ipA, portA, protocol, 444, 44, false) + flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, false) + flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 222, 22, false) + flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 333, 33, false) + flBA4 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 444, 44, false) startTime := clk.Now() table := []struct { name string @@ -274,7 +294,10 @@ func TestEndConn_Unidirectional(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -283,13 +306,14 @@ func TestEndConn_Unidirectional(t *testing.T) { portA := 9001 portB := 9002 protocol := 6 + flowDir := 0 hashIdAB := "705baa5149302fa1" hashIdBA := "cc40f571f40f3111" - flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11, false) - flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 222, 22, false) - flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, 333, 33, false) - flBA4 := newMockFlowLog(ipB, portB, ipA, portA, protocol, 444, 44, false) + flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, false) + flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 222, 22, false) + flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 333, 33, false) + flBA4 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 444, 44, false) startTime := clk.Now() table := []struct { name string @@ -376,7 +400,10 @@ func TestHeartbeat_Unidirectional(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "heartbeat", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -385,12 +412,13 @@ func TestHeartbeat_Unidirectional(t *testing.T) { portA := 9001 portB := 9002 protocol := 6 + flowDir := 0 hashIdAB := "705baa5149302fa1" hashIdBA := "cc40f571f40f3111" - flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11, false) - flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 222, 22, false) - flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, 333, 33, false) + flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, false) + flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 222, 22, false) + flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 333, 33, false) startTime := clk.Now() table := []struct { name string @@ -524,7 +552,10 @@ func TestIsFirst_LongConnection(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(false, []string{"heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(false, []string{"heartbeat", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -533,8 +564,9 @@ func TestIsFirst_LongConnection(t *testing.T) { portA := 9001 portB := 9002 protocol := 6 + flowDir := 0 hashIdAB := "705baa5149302fa1" - flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11, false) + flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, false) startTime := clk.Now() table := []struct { name string @@ -612,8 +644,10 @@ func TestIsFirst_ShortConnection(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 5 * time.Second + terminatingTimeout := 5 * time.Second + conf := buildMockConnTrackConfig(false, []string{"heartbeat", "endConnection"}, - heartbeatInterval, endConnectionTimeout) + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -622,8 +656,9 @@ func TestIsFirst_ShortConnection(t *testing.T) { portA := 9001 portB := 9002 protocol := 6 + flowDir := 0 hashIdAB := "705baa5149302fa1" - flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, 111, 11, false) + flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, false) startTime := clk.Now() table := []struct { name string @@ -675,7 +710,10 @@ func TestPrepareUpdateConnectionRecords(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(false, []string{"heartbeat"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(false, []string{"heartbeat"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) interval := 10 * time.Second extract, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -734,8 +772,10 @@ func TestScheduling(t *testing.T) { clk := clock.NewMock() defaultHeartbeatInterval := 20 * time.Second defaultEndConnectionTimeout := 15 * time.Second + defaultTerminatingTimeout := 5 * time.Second + conf := buildMockConnTrackConfig(true, []string{"heartbeat", "endConnection"}, - defaultHeartbeatInterval, defaultEndConnectionTimeout) + defaultHeartbeatInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) // Insert a scheduling group before the default group. // https://github.com/golang/go/wiki/SliceTricks#push-frontunshift conf.Extract.ConnTrack.Scheduling = append( @@ -744,6 +784,7 @@ func TestScheduling(t *testing.T) { Selector: map[string]interface{}{"Proto": 1}, // ICMP HeartbeatInterval: api.Duration{Duration: 30 * time.Second}, EndConnectionTimeout: api.Duration{Duration: 20 * time.Second}, + TerminatingTimeout: api.Duration{Duration: 10 * time.Second}, }, }, conf.Extract.ConnTrack.Scheduling...) @@ -756,12 +797,13 @@ func TestScheduling(t *testing.T) { portB := 9002 protocolTCP := 6 protocolICMP := 1 + flowDir := 0 hashIdTCP := "705baa5149302fa1" hashIdICMP := "3dccf73fe57ba06f" - flTCP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, 111, 11, false) - flTCP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, 222, 22, false) - flICMP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolICMP, 333, 33, false) - flICMP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolICMP, 444, 44, false) + flTCP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 111, 11, false) + flTCP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, false) + flICMP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolICMP, flowDir, 333, 33, false) + flICMP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolICMP, flowDir, 444, 44, false) startTime := clk.Now() table := []struct { name string @@ -864,8 +906,9 @@ func assertStoreConsistency(t *testing.T, extractor extract.Extractor) { groupsLenSlice := make([]int, 0) sumGroupsLen := 0 for _, g := range store.groups { - sumGroupsLen += g.mom.Len() - groupsLenSlice = append(groupsLenSlice, g.mom.Len()) + sumGroupsLen += g.terminatingMom.Len() + sumGroupsLen += g.activeMom.Len() + groupsLenSlice = append(groupsLenSlice, g.terminatingMom.Len()+g.activeMom.Len()) } require.Equal(t, hashLen, sumGroupsLen, "hashLen(=%v) != sum(%v)", hashLen, groupsLenSlice) } @@ -894,7 +937,10 @@ func TestMaxConnections(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) conf.Extract.ConnTrack.MaxConnectionsTracked = maxConnections extract, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -920,7 +966,10 @@ func TestIsLastFlowLogOfConnection(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 30 * time.Second endConnectionTimeout := 10 * time.Second - conf := buildMockConnTrackConfig(true, []string{}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(true, []string{}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ FieldName: tcpFlagsFieldName, @@ -936,44 +985,44 @@ func TestIsLastFlowLogOfConnection(t *testing.T) { }{ { "Happy path", - config.GenericMap{tcpFlagsFieldName: uint32(FIN_ACK_FLAG)}, - FIN_ACK_FLAG, + config.GenericMap{tcpFlagsFieldName: uint32(FIN_FLAG)}, + FIN_FLAG, true, }, { "Multiple flags 1", - config.GenericMap{tcpFlagsFieldName: uint32(FIN_ACK_FLAG | SYN_ACK_FLAG)}, - FIN_ACK_FLAG, + config.GenericMap{tcpFlagsFieldName: uint32(FIN_FLAG | SYN_ACK_FLAG)}, + FIN_FLAG, true, }, { "Multiple flags 2", - config.GenericMap{tcpFlagsFieldName: uint32(FIN_ACK_FLAG | SYN_ACK_FLAG)}, + config.GenericMap{tcpFlagsFieldName: uint32(FIN_FLAG | SYN_ACK_FLAG)}, SYN_ACK_FLAG, true, }, { "Convert from string", - config.GenericMap{tcpFlagsFieldName: fmt.Sprint(FIN_ACK_FLAG)}, - FIN_ACK_FLAG, + config.GenericMap{tcpFlagsFieldName: fmt.Sprint(FIN_FLAG)}, + FIN_FLAG, true, }, { "Cannot parse value", config.GenericMap{tcpFlagsFieldName: ""}, - FIN_ACK_FLAG, + FIN_FLAG, false, }, { - "Other flag than FIN_ACK", - config.GenericMap{tcpFlagsFieldName: FIN_FLAG}, - FIN_ACK_FLAG, + "Other flag than FIN", + config.GenericMap{tcpFlagsFieldName: FIN_ACK_FLAG}, + FIN_FLAG, false, }, { "Missing TCPFlags field", config.GenericMap{"": ""}, - FIN_ACK_FLAG, + FIN_FLAG, false, }, } @@ -992,8 +1041,10 @@ func TestDetectEndConnection(t *testing.T) { clk := clock.NewMock() defaultUpdateConnectionInterval := 30 * time.Second defaultEndConnectionTimeout := 10 * time.Second + defaultTerminatingTimeout := 5 * time.Second + conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, - defaultUpdateConnectionInterval, defaultEndConnectionTimeout) + defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ FieldName: tcpFlagsFieldName, @@ -1007,10 +1058,14 @@ func TestDetectEndConnection(t *testing.T) { portA := 9001 portB := 9002 protocolTCP := 6 + flowDir := 0 hashIdTCP := "705baa5149302fa1" - flTCP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, 111, 11, false) - flTCP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, 222, 22, false) - flTCP2[tcpFlagsFieldName] = FIN_ACK_FLAG + flTCP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 111, 11, false) + flTCP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, false) + // duplicates should be ignored + flTCP2Duplicated := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, true) + + flTCP2[tcpFlagsFieldName] = FIN_FLAG startTime := clk.Now() table := []struct { @@ -1031,15 +1086,21 @@ func TestDetectEndConnection(t *testing.T) { "5s: end connection", startTime.Add(5 * time.Second), []config.GenericMap{flTCP2}, - []config.GenericMap{ - newMockRecordEndConnAB(ipA, portA, ipB, portB, protocolTCP, 111, 222, 11, 22, 2).withHash(hashIdTCP).get(), - }, + []config.GenericMap(nil), }, { - "16s: no end connection", - startTime.Add(16 * time.Second), + "6s: end connection duplicated", + startTime.Add(6 * time.Second), + []config.GenericMap{flTCP2Duplicated}, + []config.GenericMap(nil), + }, + { + "20s: end connection without duplicated", + startTime.Add(20 * time.Second), []config.GenericMap{}, - nil, + []config.GenericMap{ + newMockRecordEndConnAB(ipA, portA, ipB, portB, protocolTCP, 111, 222, 11, 22, 2).withHash(hashIdTCP).get(), + }, }, } @@ -1056,6 +1117,7 @@ func TestDetectEndConnection(t *testing.T) { } exposed := test.ReadExposedMetrics(t) require.Contains(t, exposed, `conntrack_tcp_flags{action="detectEndConnection"} 1`) + require.Contains(t, exposed, `conntrack_input_records{classification="duplicate"} 1`) } func TestSwapAB(t *testing.T) { @@ -1063,8 +1125,10 @@ func TestSwapAB(t *testing.T) { clk := clock.NewMock() defaultUpdateConnectionInterval := 30 * time.Second defaultEndConnectionTimeout := 10 * time.Second + defaultTerminatingTimeout := 5 * time.Second + conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, - defaultUpdateConnectionInterval, defaultEndConnectionTimeout) + defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ FieldName: tcpFlagsFieldName, @@ -1078,8 +1142,9 @@ func TestSwapAB(t *testing.T) { portA := 9001 portB := 9002 protocolTCP := 6 + flowDir := 0 hashIdTCP := "705baa5149302fa1" - flTCP1 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, 111, 11, false) + flTCP1 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 111, 11, false) flTCP1[tcpFlagsFieldName] = SYN_ACK_FLAG startTime := clk.Now() @@ -1113,3 +1178,91 @@ func TestSwapAB(t *testing.T) { exposed := test.ReadExposedMetrics(t) require.Contains(t, exposed, `conntrack_tcp_flags{action="swapAB"} 1`) } + +func TestExpiringConnection(t *testing.T) { + test.ResetPromRegistry() + clk := clock.NewMock() + defaultUpdateConnectionInterval := 30 * time.Second + defaultEndConnectionTimeout := 10 * time.Second + defaultTerminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, + defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) + tcpFlagsFieldName := "TCPFlags" + conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ + FieldName: tcpFlagsFieldName, + DetectEndConnection: true, + } + ct, err := NewConnectionTrack(opMetrics, *conf, clk) + require.NoError(t, err) + + ipA := "10.0.0.1" + ipB := "10.0.0.2" + portA := 9001 + portB := 9002 + protocolTCP := 6 + flowDir := 0 + hashIdTCP := "705baa5149302fa1" + flTCP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 111, 11, false) + flTCP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, false) + flTCP2[tcpFlagsFieldName] = FIN_FLAG + flTCP3 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 333, 33, false) + flTCP4 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 555, 55, false) + + startTime := clk.Now() + table := []struct { + name string + time time.Time + inputFlowLogs []config.GenericMap + expected []config.GenericMap + }{ + { + "start: new connection", + startTime.Add(0 * time.Second), + []config.GenericMap{flTCP1}, + []config.GenericMap{ + newMockRecordNewConnAB(ipA, portA, ipB, portB, protocolTCP, 111, 0, 11, 0, 1).withHash(hashIdTCP).markFirst().get(), + }, + }, + { + "5s: FIN flow log to end the connection", + startTime.Add(5 * time.Second), + []config.GenericMap{flTCP2}, + []config.GenericMap(nil), + }, + { + "10s: flow log after FIN is still part of the connection", + startTime.Add(10 * time.Second), + []config.GenericMap{flTCP3}, + []config.GenericMap(nil), + }, + { + "16s: end connection. The after-FIN-flow-log doesn't extend the connection's life", + startTime.Add(16 * time.Second), + []config.GenericMap{}, + []config.GenericMap{ + newMockRecordEndConnAB(ipA, portA, ipB, portB, protocolTCP, 444, 222, 44, 22, 3).withHash(hashIdTCP).get(), + }, + }, + { + "17s: another flow log will create a new connection", + startTime.Add(17 * time.Second), + []config.GenericMap{flTCP4}, + []config.GenericMap{ + newMockRecordNewConnAB(ipA, portA, ipB, portB, protocolTCP, 555, 0, 55, 0, 1).withHash(hashIdTCP).markFirst().get(), + }, + }, + } + + var prevTime time.Time + for _, tt := range table { + t.Run(tt.name, func(t *testing.T) { + require.Less(t, prevTime, tt.time) + prevTime = tt.time + clk.Set(tt.time) + actual := ct.Extract(tt.inputFlowLogs) + require.Equal(t, tt.expected, actual) + assertStoreConsistency(t, ct) + }) + } +} diff --git a/pkg/pipeline/extract/conntrack/hash_test.go b/pkg/pipeline/extract/conntrack/hash_test.go index f8ee2ab25..a1823a238 100644 --- a/pkg/pipeline/extract/conntrack/hash_test.go +++ b/pkg/pipeline/extract/conntrack/hash_test.go @@ -62,6 +62,7 @@ func TestComputeHash_Unidirectional(t *testing.T) { portB := 9002 protocolA := 6 protocolB := 7 + flowDir := 0 table := []struct { name string flowLog1 config.GenericMap @@ -70,32 +71,32 @@ func TestComputeHash_Unidirectional(t *testing.T) { }{ { "Same IP, port and protocol", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 222, 11, false), true, }, { "Alternating ip+port", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipB, portB, ipA, portA, protocolA, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipB, portB, ipA, portA, protocolA, flowDir, 222, 11, false), false, }, { "Alternating ip", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipB, portA, ipA, portB, protocolA, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipB, portA, ipA, portB, protocolA, flowDir, 222, 11, false), false, }, { "Alternating port", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipA, portB, ipB, portA, protocolA, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipA, portB, ipB, portA, protocolA, flowDir, 222, 11, false), false, }, { "Same IP+port, different protocol", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipA, portA, ipB, portB, protocolB, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolB, flowDir, 222, 11, false), false, }, } @@ -150,6 +151,7 @@ func TestComputeHash_Bidirectional(t *testing.T) { portB := 9002 protocolA := 6 protocolB := 7 + flowDir := 0 table := []struct { name string flowLog1 config.GenericMap @@ -158,32 +160,32 @@ func TestComputeHash_Bidirectional(t *testing.T) { }{ { "Same IP, port and protocol", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 222, 11, false), true, }, { "Alternating ip+port", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipB, portB, ipA, portA, protocolA, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipB, portB, ipA, portA, protocolA, flowDir, 222, 11, false), true, }, { "Alternating ip", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipB, portA, ipA, portB, protocolA, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipB, portA, ipA, portB, protocolA, flowDir, 222, 11, false), false, }, { "Alternating port", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipA, portB, ipB, portA, protocolA, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipA, portB, ipB, portA, protocolA, flowDir, 222, 11, false), false, }, { "Same IP+port, different protocol", - newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false), - newMockFlowLog(ipA, portA, ipB, portB, protocolB, 222, 11, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false), + newMockFlowLog(ipA, portA, ipB, portB, protocolB, flowDir, 222, 11, false), false, }, } @@ -223,8 +225,9 @@ func TestComputeHash_MissingField(t *testing.T) { portA := 1 portB := 9002 protocolA := 6 + flowDir := 0 - fl := newMockFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22, false) + fl := newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 111, 22, false) h, err := ComputeHash(fl, keyDefinition, testHasher) require.NoError(t, err) diff --git a/pkg/pipeline/extract/conntrack/metrics.go b/pkg/pipeline/extract/conntrack/metrics.go index e26f9e164..d91d1cf78 100644 --- a/pkg/pipeline/extract/conntrack/metrics.go +++ b/pkg/pipeline/extract/conntrack/metrics.go @@ -25,9 +25,9 @@ import ( var ( connStoreLengthDef = operational.DefineMetric( "conntrack_memory_connections", - "The total number of tracked connections in memory.", + "The total number of tracked connections in memory per group and phase.", operational.TypeGauge, - "group", + "group", "phase", ) inputRecordsDef = operational.DefineMetric( diff --git a/pkg/pipeline/extract/conntrack/store.go b/pkg/pipeline/extract/conntrack/store.go index de9747e5a..8d842543e 100644 --- a/pkg/pipeline/extract/conntrack/store.go +++ b/pkg/pipeline/extract/conntrack/store.go @@ -31,6 +31,8 @@ import ( const ( expiryOrder = utils.OrderID("expiryOrder") nextHeartbeatTimeOrder = utils.OrderID("nextHeartbeatTimeOrder") + activeLabel = "active" + terminatingLabel = "terminating" ) // connectionStore provides means to manage the connections such as retrieving a connection by its hash and organizing @@ -45,8 +47,12 @@ type connectionStore struct { type groupType struct { scheduling api.ConnTrackSchedulingGroup - mom *utils.MultiOrderedMap - labelValue string + // active connections + activeMom *utils.MultiOrderedMap + // connections that detected EndConnection from TCP FIN flag. These will not trigger updates anymore until pop + // check expireConnection func + terminatingMom *utils.MultiOrderedMap + labelValue string } func (cs *connectionStore) getGroupIdx(conn connection) (groupIdx int) { @@ -64,7 +70,7 @@ func (cs *connectionStore) getGroupIdx(conn connection) (groupIdx int) { func (cs *connectionStore) addConnection(hashId uint64, conn connection) { groupIdx := cs.getGroupIdx(conn) - mom := cs.groups[groupIdx].mom + mom := cs.groups[groupIdx].activeMom err := mom.AddRecord(utils.Key(hashId), conn) if err != nil { @@ -73,51 +79,73 @@ func (cs *connectionStore) addConnection(hashId uint64, conn connection) { cs.hashId2groupIdx[hashId] = groupIdx groupLabel := cs.groups[groupIdx].labelValue - groupLen := cs.groups[groupIdx].mom.Len() - cs.metrics.connStoreLength.WithLabelValues(groupLabel).Set(float64(groupLen)) + activeLen := cs.groups[groupIdx].activeMom.Len() + cs.metrics.connStoreLength.WithLabelValues(groupLabel, activeLabel).Set(float64(activeLen)) } -func (cs *connectionStore) getConnection(hashId uint64) (connection, bool) { +func (cs *connectionStore) getConnection(hashId uint64) (connection, bool, bool) { groupIdx, found := cs.hashId2groupIdx[hashId] if !found { - return nil, false + return nil, false, false } - mom := cs.groups[groupIdx].mom + mom := cs.groups[groupIdx].activeMom + // get connection from active map + isRunning := true record, ok := mom.GetRecord(utils.Key(hashId)) if !ok { - return nil, false + // fallback on terminating map if not found + isRunning = false + mom := cs.groups[groupIdx].terminatingMom + record, ok = mom.GetRecord(utils.Key(hashId)) + if !ok { + return nil, false, false + } } conn := record.(connection) - return conn, true + return conn, true, isRunning } -func (cs *connectionStore) expireConnection(hashId uint64) { - conn, ok := cs.getConnection(hashId) +func (cs *connectionStore) setConnectionTerminating(hashId uint64) { + conn, ok, active := cs.getConnection(hashId) if !ok { log.Panicf("BUG. connection hash %x doesn't exist", hashId) return + } else if !active { + // connection is terminating + return } groupIdx := cs.hashId2groupIdx[hashId] - mom := cs.groups[groupIdx].mom - // Set the expiry time to the zero value of Time - conn.setExpiryTime(time.Time{}) - // Move to the front of the list - err := mom.MoveToFront(utils.Key(hashId), expiryOrder) + groupLabel := cs.groups[groupIdx].labelValue + activeMom := cs.groups[groupIdx].activeMom + terminatingMom := cs.groups[groupIdx].terminatingMom + timeout := cs.groups[groupIdx].scheduling.TerminatingTimeout.Duration + newExpiryTime := cs.now().Add(timeout) + conn.setExpiryTime(newExpiryTime) + // Remove connection from active map + activeMom.RemoveRecord(utils.Key(hashId)) + activeLen := cs.groups[groupIdx].activeMom.Len() + cs.metrics.connStoreLength.WithLabelValues(groupLabel, activeLabel).Set(float64(activeLen)) + // Add connection to terminating map + err := terminatingMom.AddRecord(utils.Key(hashId), conn) if err != nil { - log.Panicf("BUG. Can't update connection expiry time for hash %x: %v", hashId, err) - return + log.Errorf("BUG. connection with hash %x already exists in store. %v", hashId, conn) } + terminatingLen := cs.groups[groupIdx].terminatingMom.Len() + cs.metrics.connStoreLength.WithLabelValues(groupLabel, terminatingLabel).Set(float64(terminatingLen)) } func (cs *connectionStore) updateConnectionExpiryTime(hashId uint64) { - conn, ok := cs.getConnection(hashId) + conn, ok, active := cs.getConnection(hashId) if !ok { log.Panicf("BUG. connection hash %x doesn't exist", hashId) return + } else if !active { + // connection is terminating. expiry time can't be updated anymore + return } groupIdx := cs.hashId2groupIdx[hashId] - mom := cs.groups[groupIdx].mom + mom := cs.groups[groupIdx].activeMom timeout := cs.groups[groupIdx].scheduling.EndConnectionTimeout.Duration newExpiryTime := cs.now().Add(timeout) conn.setExpiryTime(newExpiryTime) @@ -130,13 +158,16 @@ func (cs *connectionStore) updateConnectionExpiryTime(hashId uint64) { } func (cs *connectionStore) updateNextHeartbeatTime(hashId uint64) { - conn, ok := cs.getConnection(hashId) + conn, ok, active := cs.getConnection(hashId) if !ok { log.Panicf("BUG. connection hash %x doesn't exist", hashId) return + } else if !active { + // connection is terminating. heartbeat are disabled + return } groupIdx := cs.hashId2groupIdx[hashId] - mom := cs.groups[groupIdx].mom + mom := cs.groups[groupIdx].activeMom timeout := cs.groups[groupIdx].scheduling.HeartbeatInterval.Duration newNextHeartbeatTime := cs.now().Add(timeout) conn.setNextHeartbeatTime(newNextHeartbeatTime) @@ -148,28 +179,46 @@ func (cs *connectionStore) updateNextHeartbeatTime(hashId uint64) { } } +func (cs *connectionStore) popEndConnectionOfMap(mom *utils.MultiOrderedMap, group *groupType) []connection { + var poppedConnections []connection + + mom.IterateFrontToBack(expiryOrder, func(r utils.Record) (shouldDelete, shouldStop bool) { + conn := r.(connection) + expiryTime := conn.getExpiryTime() + if cs.now().After(expiryTime) { + // The connection has expired. We want to pop it. + poppedConnections = append(poppedConnections, conn) + shouldDelete, shouldStop = true, false + delete(cs.hashId2groupIdx, conn.getHash().hashTotal) + } else { + // No more expired connections + shouldDelete, shouldStop = false, true + } + return + }) + groupLabel := group.labelValue + momLen := mom.Len() + var phaseLabel string + switch mom { + case group.activeMom: + phaseLabel = activeLabel + case group.terminatingMom: + phaseLabel = terminatingLabel + } + cs.metrics.connStoreLength.WithLabelValues(groupLabel, phaseLabel).Set(float64(momLen)) + + return poppedConnections +} + func (cs *connectionStore) popEndConnections() []connection { // Iterate over the connections by scheduling groups. // In each scheduling group iterate over them by their expiry time from old to new. var poppedConnections []connection for _, group := range cs.groups { - group.mom.IterateFrontToBack(expiryOrder, func(r utils.Record) (shouldDelete, shouldStop bool) { - conn := r.(connection) - expiryTime := conn.getExpiryTime() - if cs.now().After(expiryTime) { - // The connection has expired. We want to pop it. - poppedConnections = append(poppedConnections, conn) - shouldDelete, shouldStop = true, false - delete(cs.hashId2groupIdx, conn.getHash().hashTotal) - } else { - // No more expired connections - shouldDelete, shouldStop = false, true - } - return - }) - groupLabel := group.labelValue - groupLen := group.mom.Len() - cs.metrics.connStoreLength.WithLabelValues(groupLabel).Set(float64(groupLen)) + // Pop terminating connections first + poppedConnections = append(poppedConnections, cs.popEndConnectionOfMap(group.terminatingMom, group)...) + // Pop active connections that expired without TCP flag + poppedConnections = append(poppedConnections, cs.popEndConnectionOfMap(group.activeMom, group)...) } return poppedConnections } @@ -179,7 +228,7 @@ func (cs *connectionStore) prepareHeartbeats() []connection { // Iterate over the connections by scheduling groups. // In each scheduling group iterate over them by their next heartbeat time from old to new. for _, group := range cs.groups { - group.mom.IterateFrontToBack(nextHeartbeatTimeOrder, func(r utils.Record) (shouldDelete, shouldStop bool) { + group.activeMom.IterateFrontToBack(nextHeartbeatTimeOrder, func(r utils.Record) (shouldDelete, shouldStop bool) { conn := r.(connection) nextHeartbeat := conn.getNextHeartbeatTime() needToReport := cs.now().After(nextHeartbeat) @@ -223,9 +272,10 @@ func newConnectionStore(scheduling []api.ConnTrackSchedulingGroup, metrics *metr groups := make([]*groupType, len(scheduling)) for groupIdx, sg := range scheduling { groups[groupIdx] = &groupType{ - scheduling: sg, - mom: utils.NewMultiOrderedMap(expiryOrder, nextHeartbeatTimeOrder), - labelValue: schedulingGroupToLabelValue(groupIdx, sg), + scheduling: sg, + activeMom: utils.NewMultiOrderedMap(expiryOrder, nextHeartbeatTimeOrder), + terminatingMom: utils.NewMultiOrderedMap(expiryOrder, nextHeartbeatTimeOrder), + labelValue: schedulingGroupToLabelValue(groupIdx, sg), } } diff --git a/pkg/pipeline/extract/conntrack/utils_test.go b/pkg/pipeline/extract/conntrack/utils_test.go index 678a22e2e..e7cb87c18 100644 --- a/pkg/pipeline/extract/conntrack/utils_test.go +++ b/pkg/pipeline/extract/conntrack/utils_test.go @@ -22,16 +22,17 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" ) -func newMockFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets int, duplicate bool) config.GenericMap { +func newMockFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, direction int, bytes, packets int, duplicate bool) config.GenericMap { return config.GenericMap{ - "SrcAddr": srcIP, - "SrcPort": srcPort, - "DstAddr": dstIP, - "DstPort": dstPort, - "Proto": protocol, - "Bytes": bytes, - "Packets": packets, - "Duplicate": duplicate, + "SrcAddr": srcIP, + "SrcPort": srcPort, + "DstAddr": dstIP, + "DstPort": dstPort, + "Proto": protocol, + "FlowDirection": direction, + "Bytes": bytes, + "Packets": packets, + "Duplicate": duplicate, } }