Skip to content

Commit

Permalink
conntrack copy agg + fix duplicates (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Apr 12, 2023
1 parent cfadce6 commit 381890d
Show file tree
Hide file tree
Showing 16 changed files with 468 additions and 198 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,11 @@ parameters:
Proto: 17
endConnectionTimeout: 5s
heartbeatInterval: 40s
terminatingTimeout: 5s
- selector: {} # Default group
endConnectionTimeout: 10s
heartbeatInterval: 30s
terminatingTimeout: 5s
tcpFlags:
fieldName: Flags
detectEndConnection: true
Expand Down
1 change: 1 addition & 0 deletions contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ parameters:
scheduling:
- endConnectionTimeout: 10s
heartbeatInterval: 30s
terminatingTimeout: 5s
tcpFlags:
fieldName: TCPFlags
detectEndConnection: true
Expand Down
5 changes: 4 additions & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,19 @@ Following is the supported API format for specifying connection tracking:
count: count
min: min
max: max
first: first
last: last
splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows.
input: The input field to base the operation on. When omitted, 'name' is used
scheduling: list of timeouts and intervals to apply per selector
selector: key-value map to match against connection fields to apply this scheduling
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
terminatingTimeout: duration of time to wait from detected FIN flag to end a connection
heartbeatInterval: duration of time to wait between heartbeat reports of a connection
maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit)
tcpFlags: settings for handling TCP flags
fieldName: name of the field containing TCP flags
detectEndConnection: detect end connections by FIN_ACK flag
detectEndConnection: detect end connections by FIN flag
swapAB: swap source and destination when the first flowlog contains the SYN_ACK flag
</pre>
## Time-based Filters API
Expand Down
4 changes: 2 additions & 2 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
### conntrack_memory_connections
| **Name** | conntrack_memory_connections |
|:---|:---|
| **Description** | The total number of tracked connections in memory. |
| **Description** | The total number of tracked connections in memory per group and phase. |
| **Type** | gauge |
| **Labels** | group |
| **Labels** | group, phase |


### conntrack_output_records
Expand Down
1 change: 1 addition & 0 deletions network_definitions/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extract:
- selector: {}
heartbeatInterval: 30s
endConnectionTimeout: 10s
terminatingTimeout: 5s
outputRecordTypes:
- newConnection
- flowLog
Expand Down
11 changes: 8 additions & 3 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ type ConnTrackOperationEnum struct {
Count string `yaml:"count" json:"count" doc:"count"`
Min string `yaml:"min" json:"min" doc:"min"`
Max string `yaml:"max" json:"max" doc:"max"`
First string `yaml:"first" json:"first" doc:"first"`
Last string `yaml:"last" json:"last" doc:"last"`
}

type ConnTrackSchedulingGroup struct {
Selector map[string]interface{} `yaml:"selector,omitempty" json:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" json:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
TerminatingTimeout Duration `yaml:"terminatingTimeout,omitempty" json:"terminatingTimeout,omitempty" doc:"duration of time to wait from detected FIN flag to end a connection"`
HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" json:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"`
}

Expand All @@ -95,7 +98,7 @@ func ConnTrackOperationName(operation string) string {

type ConnTrackTCPFlags struct {
FieldName string `yaml:"fieldName,omitempty" json:"fieldName,omitempty" doc:"name of the field containing TCP flags"`
DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN_ACK flag"`
DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN flag"`
SwapAB bool `yaml:"swapAB,omitempty" json:"swapAB,omitempty" doc:"swap source and destination when the first flowlog contains the SYN_ACK flag"`
}

Expand All @@ -113,7 +116,7 @@ func (ct *ConnTrack) Validate() error {
return conntrackInvalidError{splitABWithNoBidi: true,
msg: fmt.Errorf("output field %q has splitAB=true although bidirection is not enabled (fieldGroupARef is empty)", of.Name)}
}
if !isOperationValid(of.Operation) {
if !isOperationValid(of.Operation, of.SplitAB) {
return conntrackInvalidError{unknownOperation: true,
msg: fmt.Errorf("unknown operation %q in output field %q", of.Operation, of.Name)}
}
Expand Down Expand Up @@ -250,13 +253,15 @@ func addToSet(set map[string]struct{}, item string) bool {
return true
}

func isOperationValid(value string) bool {
func isOperationValid(value string, splitAB bool) bool {
valid := true
switch value {
case ConnTrackOperationName("Sum"):
case ConnTrackOperationName("Count"):
case ConnTrackOperationName("Min"):
case ConnTrackOperationName("Max"):
case ConnTrackOperationName("First"), ConnTrackOperationName("Last"):
valid = !splitAB
default:
valid = false
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/pipeline/conntrack_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ parameters:
scheduling:
- selector: {}
endConnectionTimeout: 1s
heartbeatInterval: 10s
terminatingTimeout: 5s
outputRecordTypes:
- newConnection
- flowLog
Expand Down Expand Up @@ -120,7 +122,7 @@ func TestConnTrack(t *testing.T) {
}, test2.Interval(10*time.Millisecond))

// Wait a moment to make the connections expired
time.Sleep(2 * time.Second)
time.Sleep(5 * time.Second)

// Send something to the pipeline to allow the connection tracking output end connection records
in <- config.GenericMap{"DstAddr": "1.2.3.4"}
Expand Down
42 changes: 30 additions & 12 deletions pkg/pipeline/extract/conntrack/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,22 @@ type aggregator interface {
// addField adds an aggregate field to the connection
addField(conn connection)
// update updates the aggregate field in the connection based on the flow log.
update(conn connection, flowLog config.GenericMap, d direction)
update(conn connection, flowLog config.GenericMap, d direction, isFirst bool)
}

type aggregateBase struct {
inputField string
outputField string
splitAB bool
initVal float64
initVal interface{}
}

type aSum struct{ aggregateBase }
type aCount struct{ aggregateBase }
type aMin struct{ aggregateBase }
type aMax struct{ aggregateBase }
type aFirst struct{ aggregateBase }
type aLast struct{ aggregateBase }

// TODO: think of adding a more complex operation such as Average Packet Size which involves 2 input fields: Bytes/Packets

Expand All @@ -65,17 +67,23 @@ func newAggregator(of api.OutputField) (aggregator, error) {
var agg aggregator
switch of.Operation {
case api.ConnTrackOperationName("Sum"):
aggBase.initVal = 0
aggBase.initVal = float64(0)
agg = &aSum{aggBase}
case api.ConnTrackOperationName("Count"):
aggBase.initVal = 0
aggBase.initVal = float64(0)
agg = &aCount{aggBase}
case api.ConnTrackOperationName("Min"):
aggBase.initVal = math.MaxFloat64
agg = &aMin{aggBase}
case api.ConnTrackOperationName("Max"):
aggBase.initVal = -math.MaxFloat64
agg = &aMax{aggBase}
case api.ConnTrackOperationName("First"):
aggBase.initVal = nil
agg = &aFirst{aggBase}
case api.ConnTrackOperationName("Last"):
aggBase.initVal = nil
agg = &aLast{aggBase}
default:
return nil, fmt.Errorf("unknown operation: %q", of.Operation)
}
Expand Down Expand Up @@ -118,47 +126,57 @@ func (agg *aggregateBase) addField(conn connection) {
}
}

func (agg *aSum) update(conn connection, flowLog config.GenericMap, d direction) {
func (agg *aSum) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
outputField := agg.getOutputField(d)
v, err := agg.getInputFieldValue(flowLog)
if err != nil {
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
return
}
conn.updateAggValue(outputField, func(curr float64) float64 {
conn.updateAggFnValue(outputField, func(curr float64) float64 {
return curr + v
})
}

func (agg *aCount) update(conn connection, flowLog config.GenericMap, d direction) {
func (agg *aCount) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
outputField := agg.getOutputField(d)
conn.updateAggValue(outputField, func(curr float64) float64 {
conn.updateAggFnValue(outputField, func(curr float64) float64 {
return curr + 1
})
}

func (agg *aMin) update(conn connection, flowLog config.GenericMap, d direction) {
func (agg *aMin) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
outputField := agg.getOutputField(d)
v, err := agg.getInputFieldValue(flowLog)
if err != nil {
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
return
}

conn.updateAggValue(outputField, func(curr float64) float64 {
conn.updateAggFnValue(outputField, func(curr float64) float64 {
return math.Min(curr, v)
})
}

func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction) {
func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
outputField := agg.getOutputField(d)
v, err := agg.getInputFieldValue(flowLog)
if err != nil {
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
return
}

conn.updateAggValue(outputField, func(curr float64) float64 {
conn.updateAggFnValue(outputField, func(curr float64) float64 {
return math.Max(curr, v)
})
}

func (cp *aFirst) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
if isNew {
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
}
}

func (cp *aLast) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
}
53 changes: 40 additions & 13 deletions pkg/pipeline/extract/conntrack/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ func TestNewAggregator_Invalid(t *testing.T) {
Input: "Input",
})
require.NotNil(t, err)

// invalid first agg
_, err = newAggregator(api.OutputField{
Operation: "first",
SplitAB: true,
Input: "Input",
})
require.NotNil(t, err)
}

func TestNewAggregator_Valid(t *testing.T) {
Expand All @@ -56,27 +64,27 @@ func TestNewAggregator_Valid(t *testing.T) {
{
name: "Default SplitAB",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, 0}},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
},
{
name: "Default input",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", SplitAB: true},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, 0}},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0)}},
},
{
name: "Custom input",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", Input: "MyInput"},
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, 0}},
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0)}},
},
{
name: "OperationType sum",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, 0}},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
},
{
name: "OperationType count",
outputField: api.OutputField{Name: "MyAgg", Operation: "count"},
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, 0}},
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0)}},
},
{
name: "OperationType max",
Expand All @@ -88,6 +96,21 @@ func TestNewAggregator_Valid(t *testing.T) {
outputField: api.OutputField{Name: "MyAgg", Operation: "min"},
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64}},
},
{
name: "Default first",
outputField: api.OutputField{Name: "MyCp", Operation: "first"},
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil}},
},
{
name: "Custom input first",
outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"},
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil}},
},
{
name: "Default last",
outputField: api.OutputField{Name: "MyCp", Operation: "last"},
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil}},
},
}

for _, test := range table {
Expand All @@ -106,6 +129,8 @@ func TestAddField_and_Update(t *testing.T) {
{Name: "numFlowLogs", Operation: "count"},
{Name: "minFlowLogBytes", Operation: "min", Input: "Bytes"},
{Name: "maxFlowLogBytes", Operation: "max", Input: "Bytes"},
{Name: "FirstFlowDirection", Operation: "first", Input: "FlowDirection"},
{Name: "LastFlowDirection", Operation: "last", Input: "FlowDirection"},
}
var aggs []aggregator
for _, of := range ofs {
Expand All @@ -119,38 +144,40 @@ func TestAddField_and_Update(t *testing.T) {
portA := 1
portB := 9002
protocolA := 6
flowDirA := 0
flowDirB := 1

table := []struct {
name string
flowLog config.GenericMap
direction direction
expected map[string]float64
expected map[string]interface{}
}{
{
name: "flowLog 1",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, 100, 10, false),
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirA, 100, 10, false),
direction: dirAB,
expected: map[string]float64{"Bytes_AB": 100, "Bytes_BA": 0, "Packets": 10, "maxFlowLogBytes": 100, "minFlowLogBytes": 100, "numFlowLogs": 1},
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(0), "Packets": float64(10), "maxFlowLogBytes": float64(100), "minFlowLogBytes": float64(100), "numFlowLogs": float64(1), "FirstFlowDirection": 0, "LastFlowDirection": 0},
},
{
name: "flowLog 2",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, 200, 20, false),
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 200, 20, false),
direction: dirBA,
expected: map[string]float64{"Bytes_AB": 100, "Bytes_BA": 200, "Packets": 30, "maxFlowLogBytes": 200, "minFlowLogBytes": 100, "numFlowLogs": 2},
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(200), "Packets": float64(30), "maxFlowLogBytes": float64(200), "minFlowLogBytes": float64(100), "numFlowLogs": float64(2), "FirstFlowDirection": 0, "LastFlowDirection": 1},
},
}

conn := NewConnBuilder(nil).Build()
for _, agg := range aggs {
agg.addField(conn)
}
expectedInits := map[string]float64{"Bytes_AB": 0, "Bytes_BA": 0, "Packets": 0, "maxFlowLogBytes": -math.MaxFloat64, "minFlowLogBytes": math.MaxFloat64, "numFlowLogs": 0}
expectedInits := map[string]interface{}{"Bytes_AB": float64(0), "Bytes_BA": float64(0), "Packets": float64(0), "maxFlowLogBytes": float64(-math.MaxFloat64), "minFlowLogBytes": float64(math.MaxFloat64), "numFlowLogs": float64(0), "FirstFlowDirection": nil, "LastFlowDirection": nil}
require.Equal(t, expectedInits, conn.(*connType).aggFields)

for _, test := range table {
for i, test := range table {
t.Run(test.name, func(t *testing.T) {
for _, agg := range aggs {
agg.update(conn, test.flowLog, test.direction)
agg.update(conn, test.flowLog, test.direction, i == 0)
}
require.Equal(t, test.expected, conn.(*connType).aggFields)
})
Expand Down
Loading

0 comments on commit 381890d

Please sign in to comment.