Skip to content

Commit

Permalink
NETOBSERV-759: decorate flows with agent IP
Browse files Browse the repository at this point in the history
only for Protocol Buffers encoding. IPFIX is still missing.
  • Loading branch information
Mario Macias committed Dec 19, 2022
1 parent 0e3f5ea commit 94114f0
Show file tree
Hide file tree
Showing 18 changed files with 460 additions and 75 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ IMG_SHA = $(IMAGE_TAG_BASE):$(BUILD_SHA)
LOCAL_GENERATOR_IMAGE ?= ebpf-generator:latest

CILIUM_EBPF_VERSION := v0.8.1
GOLANGCI_LINT_VERSION = v1.46.2
GOLANGCI_LINT_VERSION = v1.50.1

CLANG ?= clang
CFLAGS := -O2 -g -Wall -Werror $(CFLAGS)
Expand Down
4 changes: 3 additions & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ flowchart TD
DD --> |"chan []*flow.Record"| CL(flow.CapacityLimiter)
CL --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
CL --> |"chan []*flow.Record"| DC(flow.Decorator)
DC --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
```
8 changes: 8 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ The following environment variables are available to configure the NetObserv eBF
* `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`.
* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector.
* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector.
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP
address from in order to report it in the AgentIP field on each flow. Accepted values are:
`external` (default), `local`, or `name:<interface name>` (e.g. `name:eth0`). If the `AGENT_IP`
configuration property is set, this property has no effect.
* `AGENT_IP_TYPE` (default: `any`). Specifies which type of IP address (IPv4 or IPv6 or any) should
the agent report in the AgentID field of each flow. Accepted values are: `any` (default), `ipv4`,
`ipv6`. If the `AGENT_IP` configuration property is set, this property has no effect.
* `INTERFACES` (optional). Comma-separated list of the interface names from where flows will be collected. If
empty, the agent will use all the interfaces in the system, excepting the ones listed in
the `EXCLUDE_INTERFACES` variable.
Expand Down
44 changes: 32 additions & 12 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"net"
"time"

"github.com/cilium/ebpf/ringbuf"
Expand Down Expand Up @@ -65,6 +66,10 @@ type Flows struct {
accounter *flow.Accounter
exporter flowExporter

// elements used to decorate flows with extra information
interfaceNamer flow.InterfaceNamer
agentIP net.IP

status Status
}

Expand Down Expand Up @@ -101,6 +106,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
informer = ifaces.NewWatcher(cfg.BuffersLength)
}

alog.Debug("acquiring Agent IP")
agentIP, err := fetchAgentIP(cfg)
if err != nil {
return nil, fmt.Errorf("acquiring Agent IP: %w", err)
}
alog.Debug("agent IP: " + agentIP.String())

// configure selected exporter
exportFunc, err := buildFlowExporter(cfg)
if err != nil {
Expand All @@ -119,14 +131,15 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
return nil, err
}

return flowsAgent(cfg, informer, fetcher, exportFunc)
return flowsAgent(cfg, informer, fetcher, exportFunc, agentIP)
}

// flowsAgent is a private constructor with injectable dependencies, usable for tests
func flowsAgent(cfg *Config,
informer ifaces.Informer,
fetcher ebpfFlowFetcher,
exporter flowExporter,
agentIP net.IP,
) (*Flows, error) {
// configure allow/deny interfaces filter
filter, err := initInterfaceFilter(cfg.Interfaces, cfg.ExcludeInterfaces)
Expand All @@ -144,19 +157,21 @@ func flowsAgent(cfg *Config,
return iface
}

mapTracer := flow.NewMapTracer(fetcher, interfaceNamer, cfg.CacheActiveTimeout)
mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout)
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout)
accounter := flow.NewAccounter(
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, interfaceNamer, time.Now, monotime.Now)
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now)
return &Flows{
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
agentIP: agentIP,
interfaceNamer: interfaceNamer,
}, nil
}

Expand Down Expand Up @@ -345,6 +360,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit,
node.ChannelBufferLen(f.cfg.BuffersLength))

decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer),
node.ChannelBufferLen(f.cfg.BuffersLength))

ebl := f.cfg.ExporterBufferLength
if ebl == 0 {
ebl = f.cfg.BuffersLength
Expand All @@ -365,7 +383,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
mapTracer.SendsTo(limiter)
accounter.SendsTo(limiter)
}
limiter.SendsTo(export)
limiter.SendsTo(decorator)
decorator.SendsTo(export)

alog.Debug("starting graph")
mapTracer.Start()
rbTracer.Start()
Expand Down
28 changes: 27 additions & 1 deletion pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"net"
"testing"
"time"

Expand All @@ -13,6 +14,8 @@ import (
"github.com/stretchr/testify/require"
)

var agentIP = "192.168.1.13"

const timeout = 2 * time.Second

func TestFlowsAgent_InvalidConfigs(t *testing.T) {
Expand Down Expand Up @@ -171,14 +174,37 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
}

func TestFlowsAgent_Decoration(t *testing.T) {
export := testAgent(t, &Config{
CacheActiveTimeout: 10 * time.Millisecond,
CacheMaxFlows: 100,
})

exported := export.Get(t, timeout)
assert.Len(t, exported, 3)

// Tests that the decoration stage has been properly executed. It should
// add the interface name and the agent IP
for _, f := range exported {
assert.Equal(t, agentIP, f.AgentIP.String())
switch f.RecordKey {
case key1, key2:
assert.Equal(t, "foo", f.Interface)
default:
assert.Equal(t, "bar", f.Interface)
}
}
}

func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
ebpf := test.NewTracerFake()
export := test.NewExporterFake()
agent, err := flowsAgent(cfg,
test.SliceInformerFake{
{Name: "foo", Index: 3},
{Name: "bar", Index: 4},
}, ebpf, export.Export)
}, ebpf, export.Export,
net.ParseIP(agentIP))
require.NoError(t, err)

go func() {
Expand Down
19 changes: 19 additions & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,28 @@ const (
DirectionIngress = "ingress"
DirectionEgress = "egress"
DirectionBoth = "both"

IPTypeAny = "any"
IPTypeIPV4 = "ipv4"
IPTypeIPV6 = "ipv6"

IPIfaceExternal = "external"
IPIfaceLocal = "local"
IPIfaceNamedPrefix = "name:"
)

type Config struct {
// AgentIP allows overriding the reported Agent IP address on each flow.
AgentIP string `env:"AGENT_IP"`
// AgentIPIface specifies which interface should the agent pick the IP address from in order to
// report it in the AgentIP field on each flow. Accepted values are: external (default), local,
// or name:<interface name> (e.g. name:eth0).
// If the AgentIP configuration property is set, this property has no effect.
AgentIPIface string `env:"AGENT_IP_IFACE" envDefault:"external"`
// AgentIPType specifies which type of IP address (IPv4 or IPv6 or any) should the agent report
// in the AgentID field of each flow. Accepted values are: any (default), ipv4, ipv6.
// If the AgentIP configuration property is set, this property has no effect.
AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"`
// Export selects the flows' exporter protocol. Accepted values are: grpc (default) or kafka
// or ipfix+udp or ipfix+tcp.
Export string `env:"EXPORT" envDefault:"grpc"`
Expand Down
137 changes: 137 additions & 0 deletions pkg/agent/ip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package agent

import (
"fmt"
"net"
"strings"
)

// dependencies that can be injected from testing
var (
interfaceByName = net.InterfaceByName
interfaceAddrs = net.InterfaceAddrs
dial = net.Dial
ifaceAddrs = func(iface *net.Interface) ([]net.Addr, error) {
return iface.Addrs()
}
)

// fetchAgentIP guesses the non-loopback IP address of the Agent host, according to the
// user-provided configuration:
// - If AgentIP is provided, this value is used whatever is the real IP of the Agent.
// - AgentIPIface specifies which interface this function should look into in order to pickup an address.
// - AgentIPType specifies which type of IP address should the agent pickup ("any" to pickup whichever
// ipv4 or ipv6 address is found first)
func fetchAgentIP(cfg *Config) (net.IP, error) {
if cfg.AgentIP != "" {
if ip := net.ParseIP(cfg.AgentIP); ip != nil {
return ip, nil
}
return nil, fmt.Errorf("can't parse provided IP %v", cfg.AgentIP)
}

if cfg.AgentIPType != IPTypeAny &&
cfg.AgentIPType != IPTypeIPV6 &&
cfg.AgentIPType != IPTypeIPV4 {
return nil, fmt.Errorf("invalid IP type %q. Valid values are: %s, %s or %s",
cfg.AgentIPType, IPTypeIPV4, IPTypeIPV6, IPTypeAny)
}

switch cfg.AgentIPIface {
case IPIfaceLocal:
return fromLocal(cfg.AgentIPType)
case IPIfaceExternal:
return fromExternal(cfg.AgentIPType)
default:
if !strings.HasPrefix(cfg.AgentIPIface, IPIfaceNamedPrefix) {
return nil, fmt.Errorf(
"invalid IP interface %q. Valid values are: %s, %s or %s<iface_name>",
cfg.AgentIPIface, IPIfaceLocal, IPIfaceExternal, IPIfaceNamedPrefix)
}
return fromInterface(cfg.AgentIPIface[len(IPIfaceNamedPrefix):], cfg.AgentIPType)
}
}

func fromInterface(ifName, ipType string) (net.IP, error) {
iface, err := interfaceByName(ifName)
if err != nil {
return nil, err
}
addrs, err := ifaceAddrs(iface)
if err != nil {
return nil, err
}
if ip, ok := findAddress(addrs, ipType); ok {
return ip, nil
}
return nil, fmt.Errorf("no matching %q addresses found at interface %v", ipType, ifName)
}

func fromLocal(ipType string) (net.IP, error) {
addrs, err := interfaceAddrs()
if err != nil {
return nil, err
}
if ip, ok := findAddress(addrs, ipType); ok {
return ip, nil
}
return nil, fmt.Errorf("no matching local %q addresses found", ipType)
}

func fromExternal(ipType string) (net.IP, error) {
// We don't really care about the existence or nonexistence of the addresses.
// This will just establish an external dialer where we can pickup the external
// host address
addrStr := "8.8.8.8:80"
if ipType == IPTypeIPV6 {
addrStr = "[2001:4860:4860::8888]:80"
}
conn, err := dial("udp", addrStr)
if err != nil {
return nil, fmt.Errorf("can't establish an external connection %w", err)
}
if addr, ok := conn.LocalAddr().(*net.UDPAddr); !ok {
return nil, fmt.Errorf("unexpected local address type %T for external connection",
conn.LocalAddr())
} else if ip, ok := getIP(addr.IP, ipType); ok {
return ip, nil
}
return nil, fmt.Errorf("no matching %q external addresses found", ipType)
}

func findAddress(addrs []net.Addr, ipType string) (net.IP, bool) {
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet != nil {
if ip, ok := getIP(ipnet.IP, ipType); ok {
return ip, true
}
}
}
return nil, false
}

func getIP(pip net.IP, ipType string) (net.IP, bool) {
if pip == nil || pip.IsLoopback() {
return nil, false
}
switch ipType {
case IPTypeIPV4:
if ip := pip.To4(); ip != nil {
return ip, true
}
case IPTypeIPV6:
// as any IP4 address can be converted to IP6, we only return any
// address that can be converted to IP6 but not to IP4
if ip := pip.To16(); ip != nil && pip.To4() == nil {
return ip, true
}
default: // Any
if ip := pip.To4(); ip != nil {
return ip, true
}
if ip := pip.To16(); ip != nil {
return ip, true
}
}
return nil, false
}
Loading

0 comments on commit 94114f0

Please sign in to comment.