Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-386 IP categorization #359

Merged
merged 2 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package api

type TransformNetwork struct {
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
DirectionInfo DirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
IPCategories []NetworkTransformIPCategory `yaml:"ipCategories,omitempty" json:"ipCategories,omitempty" doc:"configure IP categories"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
}

func (tn *TransformNetwork) GetServiceFiles() (string, string) {
Expand All @@ -45,6 +46,7 @@ const (
OpAddService = "add_service"
OpAddKubernetes = "add_kubernetes"
OpReinterpretDirection = "reinterpret_direction"
OpAddIPCategory = "add_ip_category"
)

type TransformNetworkOperationEnum struct {
Expand All @@ -55,6 +57,7 @@ type TransformNetworkOperationEnum struct {
AddService string `yaml:"add_service" json:"add_service" doc:"add output network service field from input port and parameters protocol field"`
AddKubernetes string `yaml:"add_kubernetes" json:"add_kubernetes" doc:"add output kubernetes fields from input"`
ReinterpretDirection string `yaml:"reinterpret_direction" json:"reinterpret_direction" doc:"reinterpret flow direction at a higher level than the interface"`
AddIPCategory string `yaml:"add_ip_category" json:"add_ip_category" doc:"categorize IPs based on known subnets configuration"`
}

func TransformNetworkOperationName(operation string) string {
Expand All @@ -69,7 +72,7 @@ type NetworkTransformRule struct {
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
}

type DirectionInfo struct {
type NetworkTransformDirectionInfo struct {
ReporterIPField string `yaml:"reporterIPField,omitempty" json:"reporterIPField,omitempty" doc:"field providing the reporter (agent) host IP"`
SrcHostField string `yaml:"srcHostField,omitempty" json:"srcHostField,omitempty" doc:"source host field"`
DstHostField string `yaml:"dstHostField,omitempty" json:"dstHostField,omitempty" doc:"destination host field"`
Expand All @@ -78,3 +81,8 @@ type DirectionInfo struct {
}

type NetworkTransformRules []NetworkTransformRule

type NetworkTransformIPCategory struct {
CIDRs []string `yaml:"cidrs,omitempty" json:"cidrs,omitempty" doc:"list of CIDRs to match a category"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the category"`
}
12 changes: 6 additions & 6 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
log "github.com/sirupsen/logrus"
)

const defaultExpiryTime = 120
const defaultExpiryTime = 2 * time.Minute

type gaugeInfo struct {
gauge *prometheus.GaugeVec
Expand All @@ -57,7 +57,7 @@ type EncodeProm struct {
counters []counterInfo
histos []histoInfo
aggHistos []histoInfo
expiryTime int64
expiryTime time.Duration
mCache *utils.TimedCache
exitChan <-chan struct{}
server *http.Server
Expand Down Expand Up @@ -232,14 +232,14 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
}

func (e *EncodeProm) cleanupExpiredEntriesLoop() {
ticker := time.NewTicker(time.Duration(e.expiryTime) * time.Second)
ticker := time.NewTicker(e.expiryTime)
for {
select {
case <-e.exitChan:
log.Debugf("exiting cleanupExpiredEntriesLoop because of signal")
return
case <-ticker.C:
e.mCache.CleanupExpiredEntries(e.expiryTime, e)
e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup)
}
}
}
Expand Down Expand Up @@ -274,11 +274,11 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
cfg = *params.Encode.Prom
}

expiryTime := int64(cfg.ExpiryTime)
expiryTime := time.Duration(cfg.ExpiryTime) * time.Second
if expiryTime == 0 {
expiryTime = defaultExpiryTime
}
log.Debugf("expiryTime = %d", expiryTime)
log.Debugf("expiryTime = %v", expiryTime)

counters := []counterInfo{}
gauges := []gaugeInfo{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func Test_NewEncodeProm(t *testing.T) {
require.Equal(t, 1, len(encodeProm.gauges))
require.Equal(t, 1, len(encodeProm.histos))
require.Equal(t, 1, len(encodeProm.aggHistos))
require.Equal(t, int64(1), encodeProm.expiryTime)
require.Equal(t, time.Second, encodeProm.expiryTime)
require.Equal(t, (*api.PromTLSConf)(nil), encodeProm.tlsConfig)

require.Equal(t, encodeProm.gauges[0].info.Name, "Bytes")
Expand All @@ -125,7 +125,7 @@ func Test_NewEncodeProm(t *testing.T) {

// wait a couple seconds so that the entry will expire
time.Sleep(2 * time.Second)
encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm)
encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm.Cleanup)
entriesMapLen = encodeProm.mCache.GetCacheLen()
require.Equal(t, 0, entriesMapLen)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -47,7 +48,7 @@ type Aggregate struct {
Definition api.AggregateDefinition
cache *utils.TimedCache
mutex *sync.Mutex
expiryTime int64
expiryTime time.Duration
}

type GroupState struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package aggregate
import (
"sync"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -38,7 +39,7 @@ func GetMockAggregate() Aggregate {
},
cache: utils.NewTimedCache(),
mutex: &sync.Mutex{},
expiryTime: 30,
expiryTime: 30 * time.Second,
}
return aggregate
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
log "github.com/sirupsen/logrus"
)

var defaultExpiryTime = 60 * 10 // 10 minutes
var defaultExpiryTime = 10 * time.Minute

type Aggregates struct {
Aggregates []Aggregate
expiryTime int64
expiryTime time.Duration
}

type Definitions []api.AggregateDefinition
Expand Down Expand Up @@ -72,7 +72,7 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi

func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {

ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
ticker := time.NewTicker(aggregates.expiryTime)
go func() {
for {
select {
Expand All @@ -86,18 +86,16 @@ func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {
}

func (aggregates *Aggregates) cleanupExpiredEntries() {

for _, aggregate := range aggregates.Aggregates {
aggregate.mutex.Lock()
aggregate.cache.CleanupExpiredEntries(aggregate.expiryTime, aggregate)
aggregate.cache.CleanupExpiredEntries(aggregate.expiryTime, aggregate.Cleanup)
aggregate.mutex.Unlock()
}

}

func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates, error) {
aggregates := Aggregates{
expiryTime: int64(defaultExpiryTime),
expiryTime: defaultExpiryTime,
}

for _, aggregateDefinition := range definitions {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Test_NewAggregatesFromConfig(t *testing.T) {

func Test_CleanupExpiredEntriesLoop(t *testing.T) {

defaultExpiryTime = 4 // expiration after 4 seconds
defaultExpiryTime = 4 * time.Second // expiration after 4 seconds
aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
require.Equal(t, expectedAggregate.Definition, aggregates.Aggregates[0].Definition)
Expand Down
58 changes: 55 additions & 3 deletions pkg/pipeline/transform/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,30 @@ import (
"os"
"regexp"
"strconv"
"time"

"github.com/Knetic/govaluate"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/netdb"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/sirupsen/logrus"
)

var log = logrus.WithField("component", "transform.Network")

type Network struct {
api.TransformNetwork
svcNames *netdb.ServiceNames
svcNames *netdb.ServiceNames
categories []subnetCategory
ipCatCache *utils.TimedCache
}

type subnetCategory struct {
cidrs []*net.IPNet
name string
}

func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bool) {
Expand Down Expand Up @@ -143,6 +152,15 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
}
case api.OpReinterpretDirection:
reinterpretDirection(outputEntry, &n.DirectionInfo)
case api.OpAddIPCategory:
if strIP, ok := outputEntry[rule.Input].(string); ok {
cat, ok := n.ipCatCache.GetCacheEntry(strIP)
if !ok {
cat = n.categorizeIP(net.ParseIP(strIP))
n.ipCatCache.UpdateCacheEntry(strIP, cat)
}
outputEntry[rule.Output] = cat
}

default:
log.Panicf("unknown type %s for transform.Network rule: %v", rule.Type, rule)
Expand All @@ -152,6 +170,19 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
return outputEntry, true
}

func (n *Network) categorizeIP(ip net.IP) string {
if ip != nil {
for _, subnetCat := range n.categories {
for _, cidr := range subnetCat.cidrs {
if cidr.Contains(ip) {
return subnetCat.name
}
}
}
}
return ""
}

// NewTransformNetwork create a new transform
func NewTransformNetwork(params config.StageParam) (Transformer, error) {
var needToInitLocationDB = false
Expand All @@ -171,9 +202,13 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) {
case api.OpAddService:
needToInitNetworkServices = true
case api.OpReinterpretDirection:
if err := validatereinterpretDirectionConfig(&jsonNetworkTransform.DirectionInfo); err != nil {
if err := validateReinterpretDirectionConfig(&jsonNetworkTransform.DirectionInfo); err != nil {
return nil, err
}
case api.OpAddIPCategory:
if len(jsonNetworkTransform.IPCategories) == 0 {
return nil, fmt.Errorf("a rule '%s' was found, but there are no IP categories configured", api.OpAddIPCategory)
}
}
}

Expand Down Expand Up @@ -211,11 +246,28 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) {
}
}

var subnetCats []subnetCategory
for _, category := range jsonNetworkTransform.IPCategories {
var cidrs []*net.IPNet
for _, cidr := range category.CIDRs {
_, parsed, err := net.ParseCIDR(cidr)
if err != nil {
return nil, fmt.Errorf("category %s: fail to parse CIDR, %w", category.Name, err)
}
cidrs = append(cidrs, parsed)
}
if len(cidrs) > 0 {
subnetCats = append(subnetCats, subnetCategory{name: category.Name, cidrs: cidrs})
}
}

return &Network{
TransformNetwork: api.TransformNetwork{
Rules: jsonNetworkTransform.Rules,
DirectionInfo: jsonNetworkTransform.DirectionInfo,
},
svcNames: servicesDB,
svcNames: servicesDB,
categories: subnetCats,
ipCatCache: utils.NewQuietExpiringTimedCache(2 * time.Minute),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this timeout be configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's important to make it configurable, it won't have any impact on the results (unlike conn-tracking timeout, for instance, where timeout impacts updates granularity) - this one is really an implementation detail

}, nil
}
4 changes: 2 additions & 2 deletions pkg/pipeline/transform/transform_network_direction.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const (
egress = 1
)

func validatereinterpretDirectionConfig(info *api.DirectionInfo) error {
func validateReinterpretDirectionConfig(info *api.NetworkTransformDirectionInfo) error {
if info.FlowDirectionField == "" {
return fmt.Errorf("invalid config for transform.Network rule %s: missing FlowDirectionField", api.OpReinterpretDirection)
}
Expand All @@ -28,7 +28,7 @@ func validatereinterpretDirectionConfig(info *api.DirectionInfo) error {
return nil
}

func reinterpretDirection(output config.GenericMap, info *api.DirectionInfo) {
func reinterpretDirection(output config.GenericMap, info *api.NetworkTransformDirectionInfo) {
if fd, ok := output[info.FlowDirectionField]; ok && len(info.IfDirectionField) > 0 {
output[info.IfDirectionField] = fd
}
Expand Down
Loading