From 7cc1a8f073cbef2d9afe9250180b510f55839bb8 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 3 Aug 2022 10:49:32 +0300 Subject: [PATCH 1/5] Replace yaml.Unmarshal() with strict version --- pkg/confgen/confgen.go | 4 ++-- pkg/confgen/confgen_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/confgen/confgen.go b/pkg/confgen/confgen.go index 80eb60e89..cd2419501 100644 --- a/pkg/confgen/confgen.go +++ b/pkg/confgen/confgen.go @@ -145,9 +145,9 @@ func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error { // parse yaml var defFile DefFile - err = yaml.Unmarshal(bytes, &defFile) + err = yaml.UnmarshalStrict(bytes, &defFile) if err != nil { - log.Debugf("%s yaml.Unmarshal err: %v ", name, err) + log.Debugf("%s yaml.UnmarshalStrict err: %v ", name, err) return err } diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index ae43e0806..bb92f1790 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -192,7 +192,7 @@ func Test_RunShortConfGen(t *testing.T) { destCfgBytes, err := ioutil.ReadFile(configOut) require.NoError(t, err) var out config.ConfigFileStruct - err = yaml.Unmarshal(destCfgBytes, &out) + err = yaml.UnmarshalStrict(destCfgBytes, &out) require.NoError(t, err) require.Len(t, out.Pipeline, 4) require.Len(t, out.Parameters, 4) @@ -280,7 +280,7 @@ func Test_RunLongConfGen(t *testing.T) { destCfgBytes, err := ioutil.ReadFile(configOut) require.NoError(t, err) var out config.ConfigFileStruct - err = yaml.Unmarshal(destCfgBytes, &out) + err = yaml.UnmarshalStrict(destCfgBytes, &out) require.NoError(t, err) require.Len(t, out.Parameters, 6) require.Len(t, out.Pipeline, 6) From f42fb5354490dfad35dd66aacd2247243b45c98f Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 3 Aug 2022 10:47:05 +0300 Subject: [PATCH 2/5] Replace json.Unmarshal() with a strict version in config --- pkg/confgen/encode.go | 5 ++--- pkg/confgen/extract.go | 5 ++--- pkg/confgen/transform.go | 5 ++--- pkg/confgen/visualization.go | 5 ++--- pkg/config/config.go | 14 ++++++++++-- pkg/config/config_test.go | 41 ++++++++++++++++++++++++++++++++++++ 6 files changed, 61 insertions(+), 14 deletions(-) create mode 100644 pkg/config/config_test.go diff --git a/pkg/confgen/encode.go b/pkg/confgen/encode.go index b32059b43..f43c06c2e 100644 --- a/pkg/confgen/encode.go +++ b/pkg/confgen/encode.go @@ -18,10 +18,9 @@ package confgen import ( - "encoding/json" - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" ) @@ -35,7 +34,7 @@ func (cg *ConfGen) parseEncode(encode *map[string]interface{}) (*api.PromEncode, } var prom api.PromEncode - err = json.Unmarshal(b, &prom) + err = config.JsonUnmarshalStrict(b, &prom) if err != nil { log.Debugf("Unmarshal aggregate.Definitions err: %v ", err) return nil, err diff --git a/pkg/confgen/extract.go b/pkg/confgen/extract.go index e2eac48a5..6604904d3 100644 --- a/pkg/confgen/extract.go +++ b/pkg/confgen/extract.go @@ -18,9 +18,8 @@ package confgen import ( - "encoding/json" - jsoniter "github.com/json-iterator/go" + "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" log "github.com/sirupsen/logrus" ) @@ -35,7 +34,7 @@ func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*aggregate.Def } var jsonNetworkAggregate aggregate.Definitions - err = json.Unmarshal(b, &jsonNetworkAggregate) + err = config.JsonUnmarshalStrict(b, &jsonNetworkAggregate) if err != nil { log.Debugf("Unmarshal aggregate.Definitions err: %v ", err) return nil, err diff --git a/pkg/confgen/transform.go b/pkg/confgen/transform.go index 74727b7fe..7e5b8d917 100644 --- a/pkg/confgen/transform.go +++ b/pkg/confgen/transform.go @@ -18,10 +18,9 @@ package confgen import ( - "encoding/json" - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" ) @@ -34,7 +33,7 @@ func (cg *ConfGen) parseTransport(transform *map[string]interface{}) (*api.Trans } var jsonNetworkTransform api.TransformNetwork - err = json.Unmarshal(b, &jsonNetworkTransform) + err = config.JsonUnmarshalStrict(b, &jsonNetworkTransform) if err != nil { log.Debugf("Unmarshal transform.TransformNetwork err: %v ", err) return nil, err diff --git a/pkg/confgen/visualization.go b/pkg/confgen/visualization.go index 9ae2a484a..99a52a593 100644 --- a/pkg/confgen/visualization.go +++ b/pkg/confgen/visualization.go @@ -18,9 +18,8 @@ package confgen import ( - "encoding/json" - jsoniter "github.com/json-iterator/go" + "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" ) @@ -62,7 +61,7 @@ func (cg *ConfGen) parseVisualization(visualization *Visualization) (*Visualizat } var jsonVisualization Visualization - err = json.Unmarshal(b, &jsonVisualization) + err = config.JsonUnmarshalStrict(b, &jsonVisualization) if err != nil { log.Debugf("Unmarshal aggregate.Definitions err: %v ", err) return nil, err diff --git a/pkg/config/config.go b/pkg/config/config.go index 85f5ad12e..b4cfd4d73 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -18,6 +18,7 @@ package config import ( + "bytes" "encoding/json" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -99,14 +100,14 @@ func ParseConfig(opts Options) (ConfigFileStruct, error) { out := ConfigFileStruct{} logrus.Debugf("opts.PipeLine = %v ", opts.PipeLine) - err := json.Unmarshal([]byte(opts.PipeLine), &out.Pipeline) + err := JsonUnmarshalStrict([]byte(opts.PipeLine), &out.Pipeline) if err != nil { logrus.Errorf("error when reading config file: %v", err) return out, err } logrus.Debugf("stages = %v ", out.Pipeline) - err = json.Unmarshal([]byte(opts.Parameters), &out.Parameters) + err = JsonUnmarshalStrict([]byte(opts.Parameters), &out.Parameters) if err != nil { logrus.Errorf("error when reading config file: %v", err) return out, err @@ -114,3 +115,12 @@ func ParseConfig(opts Options) (ConfigFileStruct, error) { logrus.Debugf("params = %v ", out.Parameters) return out, nil } + +// JsonUnmarshalStrict is like Unmarshal except that any fields that are found +// in the data that do not have corresponding struct members, or mapping +// keys that are duplicates, will result in an error. +func JsonUnmarshalStrict(data []byte, v interface{}) error { + dec := json.NewDecoder(bytes.NewReader(data)) + dec.DisallowUnknownFields() + return dec.Decode(v) +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go new file mode 100644 index 000000000..ce9b7ac3a --- /dev/null +++ b/pkg/config/config_test.go @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestJsonUnmarshalStrict(t *testing.T) { + type Message struct { + Foo int `json:"F"` + Bar string `json:"B"` + } + msg := `{"F":1, "B":"bbb"}` + var actualMsg Message + expectedMsg := Message{Foo: 1, Bar: "bbb"} + err := JsonUnmarshalStrict([]byte(msg), &actualMsg) + require.NoError(t, err) + require.Equal(t, expectedMsg, actualMsg) + + msg = `{"F":1, "B":"bbb", "NewField":0}` + err = JsonUnmarshalStrict([]byte(msg), &actualMsg) + require.Error(t, err) +} From 7732bec4c3b7614d100fc46a22646e920b3a7703 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 3 Aug 2022 11:07:29 +0300 Subject: [PATCH 3/5] Mark DeserializeJSONToMap() as a helper func --- pkg/test/utils.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 3ce64c242..e57ac6ef7 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -143,6 +143,7 @@ func RunCommand(command string) string { } func DeserializeJSONToMap(t *testing.T, in string) config.GenericMap { + t.Helper() var m config.GenericMap err := json.Unmarshal([]byte(in), &m) require.NoError(t, err) From be64c8d8953d7646c2467ba7d5d863c8c22c3ba5 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 3 Aug 2022 11:17:09 +0300 Subject: [PATCH 4/5] Update generated files --- .../kubernetes/flowlogs-pipeline.conf.yaml | 50 +++++++++---------- docs/metrics.md | 32 ++++++------ 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index f6d8d010d..5d42bd5e0 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -1,15 +1,28 @@ # This file was generated automatically by flowlogs-pipeline confgenerator log-level: error +pipeline: +- name: ingest_collector +- name: transform_generic + follows: ingest_collector +- name: transform_network + follows: transform_generic +- name: extract_aggregate + follows: transform_network +- name: encode_prom + follows: extract_aggregate +- name: write_loki + follows: transform_network parameters: -- ingest: +- name: ingest_collector + ingest: + type: collector collector: - hostname: 0.0.0.0 + hostName: 0.0.0.0 port: 2055 portLegacy: 2056 - type: collector - name: ingest_collector - name: transform_generic transform: + type: generic generic: policy: replace_keys rules: @@ -31,9 +44,9 @@ parameters: output: srcAS - input: DstAS output: dstAS - type: generic - name: transform_network transform: + type: network network: rules: - input: dstPort @@ -71,8 +84,9 @@ parameters: - input: dstIP output: dstLocation type: add_location - type: network -- extract: +- name: extract_aggregate + extract: + type: aggregates aggregates: - name: bandwidth_network_service by: @@ -140,9 +154,9 @@ parameters: by: - service operation: count - type: aggregates - name: extract_aggregate -- encode: +- name: encode_prom + encode: + type: prom prom: metrics: - name: bandwidth_per_network_service @@ -292,25 +306,11 @@ parameters: buckets: [] port: 9102 prefix: flp_ - type: prom - name: encode_prom - name: write_loki write: + type: loki loki: url: http://loki.default.svc.cluster.local:3100 staticLabels: job: flowlogs-pipeline - type: loki -pipeline: -- name: ingest_collector -- follows: ingest_collector - name: transform_generic -- follows: transform_generic - name: transform_network -- follows: transform_network - name: extract_aggregate -- follows: extract_aggregate - name: encode_prom -- follows: transform_network - name: write_loki diff --git a/docs/metrics.md b/docs/metrics.md index 3f41838c8..8114e0e31 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -17,7 +17,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Sum bytes for all traffic per network service | | **Usage** | Evaluate network usage breakdown per network service | -| **Labels** | bandwidth, graph, rate, network-service | +| **Tags** | bandwidth, graph, rate, network-service | | **Operation** | aggregate by `service` and `sum` field `bytes` | | **Exposed as** | `flp_bandwidth_per_network_service` of type `counter` | | **Visualized as** | "Bandwidth per network service" on dashboard `details` | @@ -29,7 +29,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Sum bandwidth bytes for all traffic per source / destination subnet pair | | **Usage** | Evaluate network usage breakdown per source / destination subnet pair | -| **Labels** | bandwidth, graph, rate, subnet | +| **Tags** | bandwidth, graph, rate, subnet | | **Operation** | aggregate by `dstSubnet24, srcSubnet24` and `sum` field `bytes` | | **Exposed as** | `flp_bandwidth_per_source_destination_subnet` of type `counter` | | **Visualized as** | "Bandwidth per src and destination subnet" on dashboard `details` | @@ -41,7 +41,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Sum bytes for all traffic per source subnet | | **Usage** | Evaluate network usage breakdown per source subnet | -| **Labels** | bandwidth, graph, rate, subnet | +| **Tags** | bandwidth, graph, rate, subnet | | **Operation** | aggregate by `srcSubnet` and `sum` field `bytes` | | **Exposed as** | `flp_bandwidth_per_source_subnet` of type `counter` | | **Visualized as** | "Bandwidth per source subnet" on dashboard `details` | @@ -53,7 +53,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Counts the number of connections per subnet with network prefix length /16 (using conn_tracking sum isNewFlow field) | | **Usage** | Evaluate network connections per subnet | -| **Labels** | rate, subnet | +| **Tags** | rate, subnet | | **Operation** | aggregate by `dstSubnet` and `count` field `isNewFlow` | | **Exposed as** | `flp_connections_per_destination_subnet` of type `counter` | | **Visualized as** | "Connections rate per destinationIP /16 subnets" on dashboard `details` | @@ -65,7 +65,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Counts the number of connections per subnet with network prefix length /16 | | **Usage** | Evaluate network connections per subnet | -| **Labels** | rate, subnet | +| **Tags** | rate, subnet | | **Operation** | aggregate by `srcSubnet` and `count` | | **Exposed as** | `flp_connections_per_source_subnet` of type `counter` | | **Visualized as** | "Connections rate per sourceIP /16 subnets" on dashboard `details` | @@ -77,7 +77,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Counts the number of connections per tcp flags | | **Usage** | Evaluate difference in connections rate of different TCP Flags. Can be used, for example, to identify syn-attacks. | -| **Labels** | rate, TCPFlags | +| **Tags** | rate, TCPFlags | | **Operation** | aggregate by `TCPFlags` and `count` | | **Exposed as** | `flp_connections_per_tcp_flags` of type `counter` | | **Visualized as** | "Connections rate per TCPFlags" on dashboard `details` | @@ -89,7 +89,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Aggregates flow records by values of "DstAS" field and counts the number of entries in each aggregate with non zero value | | **Usage** | Evaluate amount of connections targeted at different Autonomous Systems | -| **Labels** | rate, count, AS | +| **Tags** | rate, count, AS | | **Operation** | aggregate by `dstAS` and `count` | | **Exposed as** | `flp_connections_per_destination_as` of type `counter` | | **Visualized as** | "Connections rate per destination AS" on dashboard `details` | @@ -101,7 +101,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Aggregates flow records by values of "SrcAS" field and counts the number of entries in each aggregate with non zero value | | **Usage** | Evaluate amount of connections initiated by different Autonomous Systems | -| **Labels** | rate, count, AS | +| **Tags** | rate, count, AS | | **Operation** | aggregate by `srcAS` and `count` | | **Exposed as** | `flp_connections_per_source_as` of type `counter` | | **Visualized as** | "Connections rate per source AS" on dashboard `details` | @@ -113,7 +113,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Count the number of distinct source / destination subnet pairs | | **Usage** | Evaluate network usage breakdown per source / destination subnet pair | -| **Labels** | count, graph, rate, subnet | +| **Tags** | count, graph, rate, subnet | | **Operation** | aggregate by `dstSubnet24, srcSubnet24` and `count` | | **Exposed as** | `flp_count_per_source_destination_subnet` of type `counter` | | **Visualized as** | "Connections rate of src / destination subnet occurences" on dashboard `details` | @@ -125,7 +125,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Sum egress bytes for all traffic per destination subnet | | **Usage** | Evaluate network usage breakdown per destination subnet | -| **Labels** | bandwidth, graph, rate, subnet | +| **Tags** | bandwidth, graph, rate, subnet | | **Operation** | aggregate by `dstSubnet` and `sum` field `bytes` | | **Exposed as** | `flp_egress_per_destination_subnet` of type `counter` | | **Visualized as** | "Bandwidth per destination subnet" on dashboard `details` | @@ -138,7 +138,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Sum egress bytes for all traffic per namespace | | **Usage** | Evaluate network usage breakdown per namespace | -| **Labels** | kubernetes, bandwidth, graph | +| **Tags** | kubernetes, bandwidth, graph | | **Operation** | aggregate by `srcK8S_Namespace, srcK8S_Type` and `sum` field `bytes` | | **Exposed as** | `flp_egress_per_namespace` of type `counter` | | **Visualized as** | "Bandwidth per namespace" on dashboard `details` | @@ -150,7 +150,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Flows length distribution over time | | **Usage** | Evaluate flows length behavior including mice/elephant use-case | -| **Labels** | bandwidth, mice, elephant, rate | +| **Tags** | bandwidth, mice, elephant, rate | | **Operation** | aggregate by `all_Evaluate` and `raw_values` field `bytes` | | **Exposed as** | `flp_flows_length_histogram` of type `histogram` | | **Visualized as** | "Flows length heatmap" on dashboard `details` | @@ -163,7 +163,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Counts the number of connections per geo-location based on destination IP | | **Usage** | Evaluate network connections geo-location | -| **Labels** | rate, connections-count, geo-location, destinationIP | +| **Tags** | rate, connections-count, geo-location, destinationIP | | **Operation** | aggregate by `dstLocation_CountryName` and `count` | | **Exposed as** | `flp_connections_per_destination_location` of type `counter` | | **Visualized as** | "Connections rate per destinationIP geo-location" on dashboard `details` | @@ -175,7 +175,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Sum bytes for all traffic per source namespace | | **Usage** | Evaluate network usage breakdown per source namespace | -| **Labels** | loki, graph, rate, namespace | +| **Tags** | loki, graph, rate, namespace | | **Visualized as** | "Bandwidth per source namespace" on dashboard `details` | ||| @@ -185,7 +185,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Rate of loki logs per sec | | **Usage** | Evaluate loki service usage | -| **Labels** | loki, graph, rate | +| **Tags** | loki, graph, rate | | **Visualized as** | "Loki logs rate" on dashboard `details` | ||| @@ -195,7 +195,7 @@ and the transformation to generate the exported metric. |:---|:---| | **Details** | Counts the number of connections per network service based on destination port number and protocol | | **Usage** | Evaluate network services | -| **Labels** | rate, network-services, destination-port, destination-protocol | +| **Tags** | rate, network-services, destination-port, destination-protocol | | **Operation** | aggregate by `service` and `count` | | **Exposed as** | `flp_service_count` of type `counter` | | **Visualized as** | "Network services connections rate" on dashboard `details` | From df5b77576a9480f0ad9f8fbc99f417ae24d1fce1 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 3 Aug 2022 14:05:24 +0300 Subject: [PATCH 5/5] Fix e2e config --- pkg/test/e2e/pipline/flp-config.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 1531175b2..2ade78c65 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -11,8 +11,6 @@ data: hostname: 0.0.0.0 port: 2055 portLegacy: 2056 - decoder: - type: json type: collector name: ingest_collector - name: transform_generic