diff --git a/README.md b/README.md index 663b8a13a..fb80873b4 100644 --- a/README.md +++ b/README.md @@ -412,10 +412,6 @@ parameters: output: match-10.0 type: add_regex_if parameters: 10.0.* - - input: "{{.srcIP}},{{.srcPort}},{{.dstIP}},{{.dstPort}},{{.protocol}}" - output: isNewFlow - type: conn_tracking - parameters: "1" ``` The first rule `add_subnet` generates a new field named `srcSubnet` with the @@ -460,10 +456,6 @@ the contents of the `srcSubnet` field for entries that match regex expression sp in the `parameters` variable. In addition, the field `match-10.0_Matched` with value `true` is added to all matched entries -The seventh rule `conn_tracking` generates a new field named `isNewFlow` that contains -the contents of the `parameters` variable **only for new entries** (first seen in 120 seconds) -that match hash of template fields from the `input` variable. - > Note: above example describes all available transform network `Type` options diff --git a/contrib/dashboards/dashboard_details.json b/contrib/dashboards/dashboard_details.json index 6167f3809..5a6e2053a 100644 --- a/contrib/dashboards/dashboard_details.json +++ b/contrib/dashboards/dashboard_details.json @@ -363,7 +363,7 @@ "steppedLine": false, "targets": [ { - "expr": "topk(10,rate(flp_connections_per_destination_subnet[1m]))", + "expr": "topk(10,rate(flp_connections_per_destination_subnet{_RecordType=\"newConnection\"}[1m]))", "format": "time_series", "intervalFactor": 2, "legendFormat": "", diff --git a/contrib/dashboards/jsonnet/dashboard_details.jsonnet b/contrib/dashboards/jsonnet/dashboard_details.jsonnet index 53847ed2a..fa7607fd9 100644 --- a/contrib/dashboards/jsonnet/dashboard_details.jsonnet +++ b/contrib/dashboards/jsonnet/dashboard_details.jsonnet @@ -90,7 +90,7 @@ dashboard.new( ) .addTarget( prometheus.target( - expr='topk(10,rate(flp_connections_per_destination_subnet[1m]))', + expr='topk(10,rate(flp_connections_per_destination_subnet{_RecordType="newConnection"}[1m]))', ) ), gridPos={ x: 0, diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index bde7e503e..86c8c8a13 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -71,10 +71,6 @@ parameters: output: srcSubnet type: add_subnet parameters: /16 - - input: '{{.srcIP}},{{.srcPort}},{{.dstIP}},{{.dstPort}},{{.proto}}' - output: isNewFlow - type: conn_tracking - parameters: "1" - input: dstIP output: dstSubnet type: add_subnet @@ -100,10 +96,12 @@ parameters: fields: - srcIP - srcPort + - srcSubnet - name: dst fields: - dstIP - dstPort + - dstSubnet - name: protocol fields: - proto @@ -181,7 +179,6 @@ parameters: - dstSubnet - _RecordType operation: count - recordKey: isNewFlow - name: src_connection_count by: - srcSubnet @@ -324,8 +321,8 @@ parameters: value: dest_connection_subnet_count valueKey: recent_count labels: - - by - - aggregate + - _RecordType + - dstSubnet buckets: [] - name: connections_per_source_subnet type: counter diff --git a/docs/api.md b/docs/api.md index 64a7e5c39..2f7b620e0 100644 --- a/docs/api.md +++ b/docs/api.md @@ -123,7 +123,6 @@ Following is the supported API format for network transformations: input: entry input field output: entry output field type: (enum) one of the following: - conn_tracking: set output field to value of parameters field only for new flows by matching template in input field add_regex_if: add output field if input field satisfies regex pattern from parameters field add_if: add output field if input field satisfies criteria from parameters field add_subnet: add output subnet field from input field and prefix length from parameters field diff --git a/docs/metrics.md b/docs/metrics.md index 04859c0af..574d57ae2 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -68,10 +68,10 @@ and the transformation to generate the exported metric. ### connection rate per dest subnet | **Description** | This metric observes network connections rate per destination subnet | |:---|:---| -| **Details** | Counts the number of connections per subnet with network prefix length /16 (using conn_tracking sum isNewFlow field) | +| **Details** | Counts the number of connections per subnet with network prefix length /16 | | **Usage** | Evaluate network connections per subnet | | **Tags** | rate, subnet | -| **Operation** | aggregate by `dstSubnet, _RecordType` and `count` field `isNewFlow` | +| **Operation** | aggregate by `dstSubnet, _RecordType` and `count` | | **Exposed as** | `flp_connections_per_destination_subnet` of type `counter` | | **Visualized as** | "Connections rate per destinationIP /16 subnets" on dashboard `details` | ||| diff --git a/network_definitions/config.yaml b/network_definitions/config.yaml index 3e0c7fd6b..67f1a653d 100644 --- a/network_definitions/config.yaml +++ b/network_definitions/config.yaml @@ -46,10 +46,12 @@ extract: fields: - srcIP - srcPort + - srcSubnet - name: dst fields: - dstIP - dstPort + - dstSubnet - name: protocol fields: - proto diff --git a/network_definitions/connection_rate_per_dest_subnet.yaml b/network_definitions/connection_rate_per_dest_subnet.yaml index 7f22c237a..95afb8af0 100644 --- a/network_definitions/connection_rate_per_dest_subnet.yaml +++ b/network_definitions/connection_rate_per_dest_subnet.yaml @@ -2,7 +2,7 @@ description: This metric observes network connections rate per destination subnet details: - Counts the number of connections per subnet with network prefix length /16 (using conn_tracking sum isNewFlow field) + Counts the number of connections per subnet with network prefix length /16 usage: Evaluate network connections per subnet tags: @@ -10,10 +10,6 @@ tags: - subnet transform: rules: - - input: "{{.srcIP}},{{.srcPort}},{{.dstIP}},{{.dstPort}},{{.proto}}" - output: isNewFlow - type: conn_tracking - parameters: "1" - input: dstIP output: dstSubnet type: add_subnet @@ -26,7 +22,6 @@ extract: - dstSubnet - _RecordType operation: count - recordKey: isNewFlow encode: type: prom prom: @@ -36,12 +31,12 @@ encode: filter: {key: name, value: dest_connection_subnet_count} valueKey: recent_count labels: - - by - - aggregate + - _RecordType + - dstSubnet visualization: type: grafana grafana: - - expr: 'topk(10,rate(flp_connections_per_destination_subnet[1m]))' + - expr: 'topk(10,rate(flp_connections_per_destination_subnet{_RecordType="newConnection"}[1m]))' type: graphPanel dashboard: details title: diff --git a/pkg/api/api.go b/pkg/api/api.go index 4d83991a4..34e5b390c 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -34,7 +34,6 @@ const ( FilterType = "filter" ConnTrackType = "conntrack" NoneType = "none" - ConnTrackingRuleType = "conn_tracking" AddRegExIfRuleType = "add_regex_if" AddIfRuleType = "add_if" AddSubnetRuleType = "add_subnet" diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index a5da27694..b4422bf7e 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -25,7 +25,6 @@ type TransformNetwork struct { } type TransformNetworkOperationEnum struct { - ConnTracking string `yaml:"conn_tracking" json:"conn_tracking" doc:"set output field to value of parameters field only for new flows by matching template in input field"` AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"` AddIf string `yaml:"add_if" json:"add_if" doc:"add output field if input field satisfies criteria from parameters field"` AddSubnet string `yaml:"add_subnet" json:"add_subnet" doc:"add output subnet field from input field and prefix length from parameters field"` diff --git a/pkg/pipeline/transform/connection_tracking/connection_tracking.go b/pkg/pipeline/transform/connection_tracking/connection_tracking.go deleted file mode 100644 index ad7af7859..000000000 --- a/pkg/pipeline/transform/connection_tracking/connection_tracking.go +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (C) 2021 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// TODO: Delete this package once the connection tracking module is done. - -package connection_tracking - -import ( - "time" - - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" -) - -type cacheInfo struct { - flowlogsCount int -} - -const defaultExpiryTime = 120 - -type ConnectionTracking struct { - mCache *utils.TimedCache - expiryTime int64 -} - -var ( - CT ConnectionTracking - expiryTime = int64(defaultExpiryTime) -) - -func (ct ConnectionTracking) IsFlowKnown(flowIDFields string) bool { - if _, ok := ct.mCache.GetCacheEntry(flowIDFields); ok { - return true - } - return false -} - -func (ct ConnectionTracking) AddFlow(flowIDFields string) bool { - entry := cacheInfo{} - isNew := true - if cacheEntry, ok := ct.mCache.GetCacheEntry(flowIDFields); ok { - entry = cacheEntry.(cacheInfo) - isNew = false - } - entry.flowlogsCount += 1 - - ct.mCache.UpdateCacheEntry(flowIDFields, entry) - - return isNew -} - -func (ct ConnectionTracking) Cleanup(entry interface{}) {} - -func (ct ConnectionTracking) cleanupExpiredEntriesLoop() { - ticker := time.NewTicker(time.Duration(ct.expiryTime) * time.Second) - done := make(chan bool) - go func() { - for { - select { - case <-done: - return - case <-ticker.C: - ct.mCache.CleanupExpiredEntries(ct.expiryTime, ct) - } - } - }() -} - -func InitConnectionTracking() { - CT = ConnectionTracking{ - mCache: utils.NewTimedCache(), - expiryTime: expiryTime, - } - - CT.cleanupExpiredEntriesLoop() -} diff --git a/pkg/pipeline/transform/connection_tracking/connection_tracking_test.go b/pkg/pipeline/transform/connection_tracking/connection_tracking_test.go deleted file mode 100644 index d3e7f4670..000000000 --- a/pkg/pipeline/transform/connection_tracking/connection_tracking_test.go +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (C) 2021 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package connection_tracking - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func Test_InitConnectionTracking(t *testing.T) { - InitConnectionTracking() - require.NotNil(t, CT) -} - -func Test_AddFlow(t *testing.T) { - InitConnectionTracking() - isNew := CT.AddFlow("test") - require.Equal(t, true, isNew) - cacheEntry, found := CT.mCache.GetCacheEntry("test") - cInfo := cacheEntry.(cacheInfo) - require.Equal(t, true, found) - require.Equal(t, cInfo.flowlogsCount, 1) - - isNew = CT.AddFlow("test") - require.Equal(t, false, isNew) - cacheEntry, _ = CT.mCache.GetCacheEntry("test") - cInfo = cacheEntry.(cacheInfo) - require.Equal(t, cInfo.flowlogsCount, 1) - -} - -func Test_IsFlowKnown(t *testing.T) { - InitConnectionTracking() - _ = CT.AddFlow("test") - isNew := CT.IsFlowKnown("test") - require.Equal(t, true, isNew) - isNew = CT.IsFlowKnown("test_unknown") - require.Equal(t, false, isNew) -} - -func Test_cleanupExpiredEntries(t *testing.T) { - InitConnectionTracking() - CT.expiryTime = 1 - - _ = CT.AddFlow("test") - CT.mCache.CleanupExpiredEntries(CT.expiryTime, CT) - _, found := CT.mCache.GetCacheEntry("test") - require.Equal(t, true, found) - time.Sleep(2 * time.Second) - CT.mCache.CleanupExpiredEntries(CT.expiryTime, CT) - _, found = CT.mCache.GetCacheEntry("test") - require.Equal(t, false, found) -} - -func Test_cleanupExpiredEntriesLoop(t *testing.T) { - expiryTime = 1 - InitConnectionTracking() - - _ = CT.AddFlow("test") - require.Equal(t, CT.IsFlowKnown("test"), true) - time.Sleep(2 * time.Second) - require.Equal(t, CT.IsFlowKnown("test"), false) -} diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 67dc275e9..a4f060dbf 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -18,17 +18,14 @@ package transform import ( - "bytes" "fmt" "net" "regexp" "strconv" - "text/template" "github.com/Knetic/govaluate" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/connection_tracking" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location" netdb "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/network_services" @@ -54,26 +51,6 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap for _, rule := range n.Rules { switch rule.Type { - case api.TransformNetworkOperationName("ConnTracking"): - template, err := template.New("").Parse(rule.Input) - if err != nil { - panic(err) - } - buf := &bytes.Buffer{} - err = template.Execute(buf, outputEntry) - if err != nil { - panic(err) - } - FlowIDFieldsAsString := buf.String() - isNew := connection_tracking.CT.AddFlow(FlowIDFieldsAsString) - if isNew { - if rule.Parameters != "" { - outputEntry[rule.Output] = rule.Parameters - } else { - outputEntry[rule.Output] = true - } - } - case api.TransformNetworkOperationName("AddRegExIf"): matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input])) if err != nil { @@ -175,7 +152,6 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap func NewTransformNetwork(params config.StageParam) (Transformer, error) { var needToInitLocationDB = false var needToInitKubeData = false - var needToInitConnectionTracking = false var needToInitNetworkServices = false jsonNetworkTransform := api.TransformNetwork{} @@ -188,17 +164,11 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { needToInitLocationDB = true case api.TransformNetworkOperationName("AddKubernetes"): needToInitKubeData = true - case api.TransformNetworkOperationName("ConnTracking"): - needToInitConnectionTracking = true case api.TransformNetworkOperationName("AddService"): needToInitNetworkServices = true } } - if needToInitConnectionTracking { - connection_tracking.InitConnectionTracking() - } - if needToInitLocationDB { err := location.InitLocationDB() if err != nil { diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index a7db88931..346c0c736 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -210,41 +210,6 @@ func InitNewTransformNetwork(t *testing.T, configFile string) Transformer { return newTransform } -func Test_ConnTrackingTransformNetwork(t *testing.T) { - var yamlConfig = []byte(` -log-level: debug -pipeline: - - name: transform1 - - name: write1 - follows: transform1 -parameters: - - name: transform1 - transform: - type: network - network: - rules: - - input: "{{.srcIP}},{{.srcPort}},{{.dstIP}},{{.dstPort}},{{.protocol}}" - output: isNewFlow - type: conn_tracking - parameters: "777" - - name: write1 - write: - type: stdout -`) - newNetworkTransform := InitNewTransformNetwork(t, string(yamlConfig)).(*Network) - require.NotNil(t, newNetworkTransform) - - // first time flow is new - entry := test.GetIngestMockEntry(false) - output := newNetworkTransform.Transform([]config.GenericMap{entry}) - require.Equal(t, "777", output[0]["isNewFlow"]) - - // second time, same flow is not new - entry = test.GetIngestMockEntry(false) - output = newNetworkTransform.Transform([]config.GenericMap{entry}) - require.Equal(t, nil, output[0]["isNewFlow"]) -} - func Test_TransformNetworkDependentRulesAddRegExIf(t *testing.T) { var yamlConfig = []byte(` log-level: debug diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 2ade78c65..298f44384 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -56,10 +56,6 @@ data: output: srcSubnet type: add_subnet parameters: /16 - - input: '{{.srcIP}},{{.srcPort}},{{.dstIP}},{{.dstPort}},{{.proto}}' - output: isNewFlow - type: conn_tracking - parameters: "1" - input: dstIP output: dstSubnet type: add_subnet