Skip to content

Commit

Permalink
Make unmarshaling json config strict (#279)
Browse files Browse the repository at this point in the history
* Replace yaml.Unmarshal() with strict version

* Replace json.Unmarshal() with a strict version in config

* Mark DeserializeJSONToMap() as a helper func

* Update generated files

* Fix e2e config
  • Loading branch information
ronensc committed Aug 3, 2022
1 parent d7656a2 commit fb310b7
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 61 deletions.
50 changes: 25 additions & 25 deletions contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
# This file was generated automatically by flowlogs-pipeline confgenerator
log-level: error
pipeline:
- name: ingest_collector
- name: transform_generic
follows: ingest_collector
- name: transform_network
follows: transform_generic
- name: extract_aggregate
follows: transform_network
- name: encode_prom
follows: extract_aggregate
- name: write_loki
follows: transform_network
parameters:
- ingest:
- name: ingest_collector
ingest:
type: collector
collector:
hostname: 0.0.0.0
hostName: 0.0.0.0
port: 2055
portLegacy: 2056
type: collector
name: ingest_collector
- name: transform_generic
transform:
type: generic
generic:
policy: replace_keys
rules:
Expand All @@ -31,9 +44,9 @@ parameters:
output: srcAS
- input: DstAS
output: dstAS
type: generic
- name: transform_network
transform:
type: network
network:
rules:
- input: dstPort
Expand Down Expand Up @@ -71,8 +84,9 @@ parameters:
- input: dstIP
output: dstLocation
type: add_location
type: network
- extract:
- name: extract_aggregate
extract:
type: aggregates
aggregates:
- name: bandwidth_network_service
by:
Expand Down Expand Up @@ -140,9 +154,9 @@ parameters:
by:
- service
operation: count
type: aggregates
name: extract_aggregate
- encode:
- name: encode_prom
encode:
type: prom
prom:
metrics:
- name: bandwidth_per_network_service
Expand Down Expand Up @@ -292,25 +306,11 @@ parameters:
buckets: []
port: 9102
prefix: flp_
type: prom
name: encode_prom
- name: write_loki
write:
type: loki
loki:
url: http://loki.default.svc.cluster.local:3100
staticLabels:
job: flowlogs-pipeline
type: loki
pipeline:
- name: ingest_collector
- follows: ingest_collector
name: transform_generic
- follows: transform_generic
name: transform_network
- follows: transform_network
name: extract_aggregate
- follows: extract_aggregate
name: encode_prom
- follows: transform_network
name: write_loki

32 changes: 16 additions & 16 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Sum bytes for all traffic per network service |
| **Usage** | Evaluate network usage breakdown per network service |
| **Labels** | bandwidth, graph, rate, network-service |
| **Tags** | bandwidth, graph, rate, network-service |
| **Operation** | aggregate by `service` and `sum` field `bytes` |
| **Exposed as** | `flp_bandwidth_per_network_service` of type `counter` |
| **Visualized as** | "Bandwidth per network service" on dashboard `details` |
Expand All @@ -29,7 +29,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Sum bandwidth bytes for all traffic per source / destination subnet pair |
| **Usage** | Evaluate network usage breakdown per source / destination subnet pair |
| **Labels** | bandwidth, graph, rate, subnet |
| **Tags** | bandwidth, graph, rate, subnet |
| **Operation** | aggregate by `dstSubnet24, srcSubnet24` and `sum` field `bytes` |
| **Exposed as** | `flp_bandwidth_per_source_destination_subnet` of type `counter` |
| **Visualized as** | "Bandwidth per src and destination subnet" on dashboard `details` |
Expand All @@ -41,7 +41,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Sum bytes for all traffic per source subnet |
| **Usage** | Evaluate network usage breakdown per source subnet |
| **Labels** | bandwidth, graph, rate, subnet |
| **Tags** | bandwidth, graph, rate, subnet |
| **Operation** | aggregate by `srcSubnet` and `sum` field `bytes` |
| **Exposed as** | `flp_bandwidth_per_source_subnet` of type `counter` |
| **Visualized as** | "Bandwidth per source subnet" on dashboard `details` |
Expand All @@ -53,7 +53,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Counts the number of connections per subnet with network prefix length /16 (using conn_tracking sum isNewFlow field) |
| **Usage** | Evaluate network connections per subnet |
| **Labels** | rate, subnet |
| **Tags** | rate, subnet |
| **Operation** | aggregate by `dstSubnet` and `count` field `isNewFlow` |
| **Exposed as** | `flp_connections_per_destination_subnet` of type `counter` |
| **Visualized as** | "Connections rate per destinationIP /16 subnets" on dashboard `details` |
Expand All @@ -65,7 +65,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Counts the number of connections per subnet with network prefix length /16 |
| **Usage** | Evaluate network connections per subnet |
| **Labels** | rate, subnet |
| **Tags** | rate, subnet |
| **Operation** | aggregate by `srcSubnet` and `count` |
| **Exposed as** | `flp_connections_per_source_subnet` of type `counter` |
| **Visualized as** | "Connections rate per sourceIP /16 subnets" on dashboard `details` |
Expand All @@ -77,7 +77,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Counts the number of connections per tcp flags |
| **Usage** | Evaluate difference in connections rate of different TCP Flags. Can be used, for example, to identify syn-attacks. |
| **Labels** | rate, TCPFlags |
| **Tags** | rate, TCPFlags |
| **Operation** | aggregate by `TCPFlags` and `count` |
| **Exposed as** | `flp_connections_per_tcp_flags` of type `counter` |
| **Visualized as** | "Connections rate per TCPFlags" on dashboard `details` |
Expand All @@ -89,7 +89,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Aggregates flow records by values of "DstAS" field and counts the number of entries in each aggregate with non zero value |
| **Usage** | Evaluate amount of connections targeted at different Autonomous Systems |
| **Labels** | rate, count, AS |
| **Tags** | rate, count, AS |
| **Operation** | aggregate by `dstAS` and `count` |
| **Exposed as** | `flp_connections_per_destination_as` of type `counter` |
| **Visualized as** | "Connections rate per destination AS" on dashboard `details` |
Expand All @@ -101,7 +101,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Aggregates flow records by values of "SrcAS" field and counts the number of entries in each aggregate with non zero value |
| **Usage** | Evaluate amount of connections initiated by different Autonomous Systems |
| **Labels** | rate, count, AS |
| **Tags** | rate, count, AS |
| **Operation** | aggregate by `srcAS` and `count` |
| **Exposed as** | `flp_connections_per_source_as` of type `counter` |
| **Visualized as** | "Connections rate per source AS" on dashboard `details` |
Expand All @@ -113,7 +113,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Count the number of distinct source / destination subnet pairs |
| **Usage** | Evaluate network usage breakdown per source / destination subnet pair |
| **Labels** | count, graph, rate, subnet |
| **Tags** | count, graph, rate, subnet |
| **Operation** | aggregate by `dstSubnet24, srcSubnet24` and `count` |
| **Exposed as** | `flp_count_per_source_destination_subnet` of type `counter` |
| **Visualized as** | "Connections rate of src / destination subnet occurences" on dashboard `details` |
Expand All @@ -125,7 +125,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Sum egress bytes for all traffic per destination subnet |
| **Usage** | Evaluate network usage breakdown per destination subnet |
| **Labels** | bandwidth, graph, rate, subnet |
| **Tags** | bandwidth, graph, rate, subnet |
| **Operation** | aggregate by `dstSubnet` and `sum` field `bytes` |
| **Exposed as** | `flp_egress_per_destination_subnet` of type `counter` |
| **Visualized as** | "Bandwidth per destination subnet" on dashboard `details` |
Expand All @@ -138,7 +138,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Sum egress bytes for all traffic per namespace |
| **Usage** | Evaluate network usage breakdown per namespace |
| **Labels** | kubernetes, bandwidth, graph |
| **Tags** | kubernetes, bandwidth, graph |
| **Operation** | aggregate by `srcK8S_Namespace, srcK8S_Type` and `sum` field `bytes` |
| **Exposed as** | `flp_egress_per_namespace` of type `counter` |
| **Visualized as** | "Bandwidth per namespace" on dashboard `details` |
Expand All @@ -150,7 +150,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Flows length distribution over time |
| **Usage** | Evaluate flows length behavior including mice/elephant use-case |
| **Labels** | bandwidth, mice, elephant, rate |
| **Tags** | bandwidth, mice, elephant, rate |
| **Operation** | aggregate by `all_Evaluate` and `raw_values` field `bytes` |
| **Exposed as** | `flp_flows_length_histogram` of type `histogram` |
| **Visualized as** | "Flows length heatmap" on dashboard `details` |
Expand All @@ -163,7 +163,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Counts the number of connections per geo-location based on destination IP |
| **Usage** | Evaluate network connections geo-location |
| **Labels** | rate, connections-count, geo-location, destinationIP |
| **Tags** | rate, connections-count, geo-location, destinationIP |
| **Operation** | aggregate by `dstLocation_CountryName` and `count` |
| **Exposed as** | `flp_connections_per_destination_location` of type `counter` |
| **Visualized as** | "Connections rate per destinationIP geo-location" on dashboard `details` |
Expand All @@ -175,7 +175,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Sum bytes for all traffic per source namespace |
| **Usage** | Evaluate network usage breakdown per source namespace |
| **Labels** | loki, graph, rate, namespace |
| **Tags** | loki, graph, rate, namespace |
| **Visualized as** | "Bandwidth per source namespace" on dashboard `details` |
|||

Expand All @@ -185,7 +185,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Rate of loki logs per sec |
| **Usage** | Evaluate loki service usage |
| **Labels** | loki, graph, rate |
| **Tags** | loki, graph, rate |
| **Visualized as** | "Loki logs rate" on dashboard `details` |
|||

Expand All @@ -195,7 +195,7 @@ and the transformation to generate the exported metric.
|:---|:---|
| **Details** | Counts the number of connections per network service based on destination port number and protocol |
| **Usage** | Evaluate network services |
| **Labels** | rate, network-services, destination-port, destination-protocol |
| **Tags** | rate, network-services, destination-port, destination-protocol |
| **Operation** | aggregate by `service` and `count` |
| **Exposed as** | `flp_service_count` of type `counter` |
| **Visualized as** | "Network services connections rate" on dashboard `details` |
Expand Down
4 changes: 2 additions & 2 deletions pkg/confgen/confgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error {

// parse yaml
var defFile DefFile
err = yaml.Unmarshal(bytes, &defFile)
err = yaml.UnmarshalStrict(bytes, &defFile)
if err != nil {
log.Debugf("%s yaml.Unmarshal err: %v ", name, err)
log.Debugf("%s yaml.UnmarshalStrict err: %v ", name, err)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/confgen/confgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func Test_RunShortConfGen(t *testing.T) {
destCfgBytes, err := ioutil.ReadFile(configOut)
require.NoError(t, err)
var out config.ConfigFileStruct
err = yaml.Unmarshal(destCfgBytes, &out)
err = yaml.UnmarshalStrict(destCfgBytes, &out)
require.NoError(t, err)
require.Len(t, out.Pipeline, 4)
require.Len(t, out.Parameters, 4)
Expand Down Expand Up @@ -280,7 +280,7 @@ func Test_RunLongConfGen(t *testing.T) {
destCfgBytes, err := ioutil.ReadFile(configOut)
require.NoError(t, err)
var out config.ConfigFileStruct
err = yaml.Unmarshal(destCfgBytes, &out)
err = yaml.UnmarshalStrict(destCfgBytes, &out)
require.NoError(t, err)
require.Len(t, out.Parameters, 6)
require.Len(t, out.Pipeline, 6)
Expand Down
5 changes: 2 additions & 3 deletions pkg/confgen/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package confgen

import (
"encoding/json"

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
)

Expand All @@ -35,7 +34,7 @@ func (cg *ConfGen) parseEncode(encode *map[string]interface{}) (*api.PromEncode,
}

var prom api.PromEncode
err = json.Unmarshal(b, &prom)
err = config.JsonUnmarshalStrict(b, &prom)
if err != nil {
log.Debugf("Unmarshal aggregate.Definitions err: %v ", err)
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions pkg/confgen/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package confgen

import (
"encoding/json"

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate"
log "github.com/sirupsen/logrus"
)
Expand All @@ -35,7 +34,7 @@ func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*aggregate.Def
}

var jsonNetworkAggregate aggregate.Definitions
err = json.Unmarshal(b, &jsonNetworkAggregate)
err = config.JsonUnmarshalStrict(b, &jsonNetworkAggregate)
if err != nil {
log.Debugf("Unmarshal aggregate.Definitions err: %v ", err)
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions pkg/confgen/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package confgen

import (
"encoding/json"

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
)

Expand All @@ -34,7 +33,7 @@ func (cg *ConfGen) parseTransport(transform *map[string]interface{}) (*api.Trans
}

var jsonNetworkTransform api.TransformNetwork
err = json.Unmarshal(b, &jsonNetworkTransform)
err = config.JsonUnmarshalStrict(b, &jsonNetworkTransform)
if err != nil {
log.Debugf("Unmarshal transform.TransformNetwork err: %v ", err)
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions pkg/confgen/visualization.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package confgen

import (
"encoding/json"

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -62,7 +61,7 @@ func (cg *ConfGen) parseVisualization(visualization *Visualization) (*Visualizat
}

var jsonVisualization Visualization
err = json.Unmarshal(b, &jsonVisualization)
err = config.JsonUnmarshalStrict(b, &jsonVisualization)
if err != nil {
log.Debugf("Unmarshal aggregate.Definitions err: %v ", err)
return nil, err
Expand Down
14 changes: 12 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package config

import (
"bytes"
"encoding/json"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand Down Expand Up @@ -99,18 +100,27 @@ func ParseConfig(opts Options) (ConfigFileStruct, error) {
out := ConfigFileStruct{}

logrus.Debugf("opts.PipeLine = %v ", opts.PipeLine)
err := json.Unmarshal([]byte(opts.PipeLine), &out.Pipeline)
err := JsonUnmarshalStrict([]byte(opts.PipeLine), &out.Pipeline)
if err != nil {
logrus.Errorf("error when reading config file: %v", err)
return out, err
}
logrus.Debugf("stages = %v ", out.Pipeline)

err = json.Unmarshal([]byte(opts.Parameters), &out.Parameters)
err = JsonUnmarshalStrict([]byte(opts.Parameters), &out.Parameters)
if err != nil {
logrus.Errorf("error when reading config file: %v", err)
return out, err
}
logrus.Debugf("params = %v ", out.Parameters)
return out, nil
}

// JsonUnmarshalStrict is like Unmarshal except that any fields that are found
// in the data that do not have corresponding struct members, or mapping
// keys that are duplicates, will result in an error.
func JsonUnmarshalStrict(data []byte, v interface{}) error {
dec := json.NewDecoder(bytes.NewReader(data))
dec.DisallowUnknownFields()
return dec.Decode(v)
}
Loading

0 comments on commit fb310b7

Please sign in to comment.