From aa7838d82f4d811b8ff2f7b4e1abdbf122e3557f Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 20 Dec 2022 16:54:55 +0100 Subject: [PATCH] NETOBSERV-759: decorate flows with agent IP (#78) * NETOBSERV-759: decorate flows with agent IP only for Protocol Buffers encoding. IPFIX is still missing. * Fix GRPC export + test --- Makefile | 2 +- docs/architecture.md | 4 +- docs/config.md | 8 ++ pkg/agent/agent.go | 44 +++++++--- pkg/agent/agent_test.go | 28 ++++++- pkg/agent/config.go | 19 +++++ pkg/agent/ip.go | 137 +++++++++++++++++++++++++++++++ pkg/agent/ip_test.go | 139 ++++++++++++++++++++++++++++++++ pkg/exporter/grpc_proto_test.go | 67 +++++++++++++++ pkg/exporter/ipfix.go | 3 + pkg/exporter/proto.go | 13 +++ pkg/flow/account.go | 6 +- pkg/flow/account_test.go | 12 +-- pkg/flow/decorator.go | 22 +++++ pkg/flow/record.go | 7 +- pkg/flow/tracer_map.go | 5 +- pkg/grpc/grpc_test.go | 6 ++ pkg/pbflow/flow.pb.go | 89 +++++++++++--------- pkg/test/export_fake.go | 1 + proto/flow.proto | 3 + 20 files changed, 540 insertions(+), 75 deletions(-) create mode 100644 pkg/agent/ip.go create mode 100644 pkg/agent/ip_test.go create mode 100644 pkg/exporter/grpc_proto_test.go create mode 100644 pkg/flow/decorator.go diff --git a/Makefile b/Makefile index 65436e45b..b944500f2 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ IMG_SHA = $(IMAGE_TAG_BASE):$(BUILD_SHA) LOCAL_GENERATOR_IMAGE ?= ebpf-generator:latest CILIUM_EBPF_VERSION := v0.8.1 -GOLANGCI_LINT_VERSION = v1.46.2 +GOLANGCI_LINT_VERSION = v1.50.1 CLANG ?= clang CFLAGS := -O2 -g -Wall -Werror $(CFLAGS) diff --git a/docs/architecture.md b/docs/architecture.md index d45ab0640..3e799481c 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -23,5 +23,7 @@ flowchart TD DD --> |"chan []*flow.Record"| CL(flow.CapacityLimiter) - CL --> |"chan []*flow.Record"| EX("export.GRPCProto
or
export.KafkaProto") + CL --> |"chan []*flow.Record"| DC(flow.Decorator) + + DC --> |"chan []*flow.Record"| EX("export.GRPCProto
or
export.KafkaProto") ``` \ No newline at end of file diff --git a/docs/config.md b/docs/config.md index b89986e15..5fec13f3c 100644 --- a/docs/config.md +++ b/docs/config.md @@ -5,6 +5,14 @@ The following environment variables are available to configure the NetObserv eBF * `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`. * `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector. * `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector. +* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow. +* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP + address from in order to report it in the AgentIP field on each flow. Accepted values are: + `external` (default), `local`, or `name:` (e.g. `name:eth0`). If the `AGENT_IP` + configuration property is set, this property has no effect. +* `AGENT_IP_TYPE` (default: `any`). Specifies which type of IP address (IPv4 or IPv6 or any) should + the agent report in the AgentID field of each flow. Accepted values are: `any` (default), `ipv4`, + `ipv6`. If the `AGENT_IP` configuration property is set, this property has no effect. * `INTERFACES` (optional). Comma-separated list of the interface names from where flows will be collected. If empty, the agent will use all the interfaces in the system, excepting the ones listed in the `EXCLUDE_INTERFACES` variable. diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index f0f116f72..27df1fd65 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "net" "time" "github.com/cilium/ebpf/ringbuf" @@ -65,6 +66,10 @@ type Flows struct { accounter *flow.Accounter exporter flowExporter + // elements used to decorate flows with extra information + interfaceNamer flow.InterfaceNamer + agentIP net.IP + status Status } @@ -101,6 +106,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) { informer = ifaces.NewWatcher(cfg.BuffersLength) } + alog.Debug("acquiring Agent IP") + agentIP, err := fetchAgentIP(cfg) + if err != nil { + return nil, fmt.Errorf("acquiring Agent IP: %w", err) + } + alog.Debug("agent IP: " + agentIP.String()) + // configure selected exporter exportFunc, err := buildFlowExporter(cfg) if err != nil { @@ -119,7 +131,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) { return nil, err } - return flowsAgent(cfg, informer, fetcher, exportFunc) + return flowsAgent(cfg, informer, fetcher, exportFunc, agentIP) } // flowsAgent is a private constructor with injectable dependencies, usable for tests @@ -127,6 +139,7 @@ func flowsAgent(cfg *Config, informer ifaces.Informer, fetcher ebpfFlowFetcher, exporter flowExporter, + agentIP net.IP, ) (*Flows, error) { // configure allow/deny interfaces filter filter, err := initInterfaceFilter(cfg.Interfaces, cfg.ExcludeInterfaces) @@ -144,19 +157,21 @@ func flowsAgent(cfg *Config, return iface } - mapTracer := flow.NewMapTracer(fetcher, interfaceNamer, cfg.CacheActiveTimeout) + mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout) rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout) accounter := flow.NewAccounter( - cfg.CacheMaxFlows, cfg.CacheActiveTimeout, interfaceNamer, time.Now, monotime.Now) + cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now) return &Flows{ - ebpf: fetcher, - exporter: exporter, - interfaces: registerer, - filter: filter, - cfg: cfg, - mapTracer: mapTracer, - rbTracer: rbTracer, - accounter: accounter, + ebpf: fetcher, + exporter: exporter, + interfaces: registerer, + filter: filter, + cfg: cfg, + mapTracer: mapTracer, + rbTracer: rbTracer, + accounter: accounter, + agentIP: agentIP, + interfaceNamer: interfaceNamer, }, nil } @@ -345,6 +360,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit, node.ChannelBufferLen(f.cfg.BuffersLength)) + decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer), + node.ChannelBufferLen(f.cfg.BuffersLength)) + ebl := f.cfg.ExporterBufferLength if ebl == 0 { ebl = f.cfg.BuffersLength @@ -365,7 +383,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro mapTracer.SendsTo(limiter) accounter.SendsTo(limiter) } - limiter.SendsTo(export) + limiter.SendsTo(decorator) + decorator.SendsTo(export) + alog.Debug("starting graph") mapTracer.Start() rbTracer.Start() diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index f2fc77710..f84475651 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -2,6 +2,7 @@ package agent import ( "context" + "net" "testing" "time" @@ -13,6 +14,8 @@ import ( "github.com/stretchr/testify/require" ) +var agentIP = "192.168.1.13" + const timeout = 2 * time.Second func TestFlowsAgent_InvalidConfigs(t *testing.T) { @@ -171,6 +174,28 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows) } +func TestFlowsAgent_Decoration(t *testing.T) { + export := testAgent(t, &Config{ + CacheActiveTimeout: 10 * time.Millisecond, + CacheMaxFlows: 100, + }) + + exported := export.Get(t, timeout) + assert.Len(t, exported, 3) + + // Tests that the decoration stage has been properly executed. It should + // add the interface name and the agent IP + for _, f := range exported { + assert.Equal(t, agentIP, f.AgentIP.String()) + switch f.RecordKey { + case key1, key2: + assert.Equal(t, "foo", f.Interface) + default: + assert.Equal(t, "bar", f.Interface) + } + } +} + func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { ebpf := test.NewTracerFake() export := test.NewExporterFake() @@ -178,7 +203,8 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { test.SliceInformerFake{ {Name: "foo", Index: 3}, {Name: "bar", Index: 4}, - }, ebpf, export.Export) + }, ebpf, export.Export, + net.ParseIP(agentIP)) require.NoError(t, err) go func() { diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 90ccf3978..c2edd7edf 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -12,9 +12,28 @@ const ( DirectionIngress = "ingress" DirectionEgress = "egress" DirectionBoth = "both" + + IPTypeAny = "any" + IPTypeIPV4 = "ipv4" + IPTypeIPV6 = "ipv6" + + IPIfaceExternal = "external" + IPIfaceLocal = "local" + IPIfaceNamedPrefix = "name:" ) type Config struct { + // AgentIP allows overriding the reported Agent IP address on each flow. + AgentIP string `env:"AGENT_IP"` + // AgentIPIface specifies which interface should the agent pick the IP address from in order to + // report it in the AgentIP field on each flow. Accepted values are: external (default), local, + // or name: (e.g. name:eth0). + // If the AgentIP configuration property is set, this property has no effect. + AgentIPIface string `env:"AGENT_IP_IFACE" envDefault:"external"` + // AgentIPType specifies which type of IP address (IPv4 or IPv6 or any) should the agent report + // in the AgentID field of each flow. Accepted values are: any (default), ipv4, ipv6. + // If the AgentIP configuration property is set, this property has no effect. + AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"` // Export selects the flows' exporter protocol. Accepted values are: grpc (default) or kafka // or ipfix+udp or ipfix+tcp. Export string `env:"EXPORT" envDefault:"grpc"` diff --git a/pkg/agent/ip.go b/pkg/agent/ip.go new file mode 100644 index 000000000..c41c86409 --- /dev/null +++ b/pkg/agent/ip.go @@ -0,0 +1,137 @@ +package agent + +import ( + "fmt" + "net" + "strings" +) + +// dependencies that can be injected from testing +var ( + interfaceByName = net.InterfaceByName + interfaceAddrs = net.InterfaceAddrs + dial = net.Dial + ifaceAddrs = func(iface *net.Interface) ([]net.Addr, error) { + return iface.Addrs() + } +) + +// fetchAgentIP guesses the non-loopback IP address of the Agent host, according to the +// user-provided configuration: +// - If AgentIP is provided, this value is used whatever is the real IP of the Agent. +// - AgentIPIface specifies which interface this function should look into in order to pickup an address. +// - AgentIPType specifies which type of IP address should the agent pickup ("any" to pickup whichever +// ipv4 or ipv6 address is found first) +func fetchAgentIP(cfg *Config) (net.IP, error) { + if cfg.AgentIP != "" { + if ip := net.ParseIP(cfg.AgentIP); ip != nil { + return ip, nil + } + return nil, fmt.Errorf("can't parse provided IP %v", cfg.AgentIP) + } + + if cfg.AgentIPType != IPTypeAny && + cfg.AgentIPType != IPTypeIPV6 && + cfg.AgentIPType != IPTypeIPV4 { + return nil, fmt.Errorf("invalid IP type %q. Valid values are: %s, %s or %s", + cfg.AgentIPType, IPTypeIPV4, IPTypeIPV6, IPTypeAny) + } + + switch cfg.AgentIPIface { + case IPIfaceLocal: + return fromLocal(cfg.AgentIPType) + case IPIfaceExternal: + return fromExternal(cfg.AgentIPType) + default: + if !strings.HasPrefix(cfg.AgentIPIface, IPIfaceNamedPrefix) { + return nil, fmt.Errorf( + "invalid IP interface %q. Valid values are: %s, %s or %s", + cfg.AgentIPIface, IPIfaceLocal, IPIfaceExternal, IPIfaceNamedPrefix) + } + return fromInterface(cfg.AgentIPIface[len(IPIfaceNamedPrefix):], cfg.AgentIPType) + } +} + +func fromInterface(ifName, ipType string) (net.IP, error) { + iface, err := interfaceByName(ifName) + if err != nil { + return nil, err + } + addrs, err := ifaceAddrs(iface) + if err != nil { + return nil, err + } + if ip, ok := findAddress(addrs, ipType); ok { + return ip, nil + } + return nil, fmt.Errorf("no matching %q addresses found at interface %v", ipType, ifName) +} + +func fromLocal(ipType string) (net.IP, error) { + addrs, err := interfaceAddrs() + if err != nil { + return nil, err + } + if ip, ok := findAddress(addrs, ipType); ok { + return ip, nil + } + return nil, fmt.Errorf("no matching local %q addresses found", ipType) +} + +func fromExternal(ipType string) (net.IP, error) { + // We don't really care about the existence or nonexistence of the addresses. + // This will just establish an external dialer where we can pickup the external + // host address + addrStr := "8.8.8.8:80" + if ipType == IPTypeIPV6 { + addrStr = "[2001:4860:4860::8888]:80" + } + conn, err := dial("udp", addrStr) + if err != nil { + return nil, fmt.Errorf("can't establish an external connection %w", err) + } + if addr, ok := conn.LocalAddr().(*net.UDPAddr); !ok { + return nil, fmt.Errorf("unexpected local address type %T for external connection", + conn.LocalAddr()) + } else if ip, ok := getIP(addr.IP, ipType); ok { + return ip, nil + } + return nil, fmt.Errorf("no matching %q external addresses found", ipType) +} + +func findAddress(addrs []net.Addr, ipType string) (net.IP, bool) { + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && ipnet != nil { + if ip, ok := getIP(ipnet.IP, ipType); ok { + return ip, true + } + } + } + return nil, false +} + +func getIP(pip net.IP, ipType string) (net.IP, bool) { + if pip == nil || pip.IsLoopback() { + return nil, false + } + switch ipType { + case IPTypeIPV4: + if ip := pip.To4(); ip != nil { + return ip, true + } + case IPTypeIPV6: + // as any IP4 address can be converted to IP6, we only return any + // address that can be converted to IP6 but not to IP4 + if ip := pip.To16(); ip != nil && pip.To4() == nil { + return ip, true + } + default: // Any + if ip := pip.To4(); ip != nil { + return ip, true + } + if ip := pip.To16(); ip != nil { + return ip, true + } + } + return nil, false +} diff --git a/pkg/agent/ip_test.go b/pkg/agent/ip_test.go new file mode 100644 index 000000000..3b9f69b15 --- /dev/null +++ b/pkg/agent/ip_test.go @@ -0,0 +1,139 @@ +package agent + +import ( + "errors" + "net" + "regexp" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +var ( + localIP4 = net.ParseIP("10.0.0.10") + localIP6 = net.ParseIP("2001:0db8::1111") + externalIP4 = net.ParseIP("84.88.89.90") + externalIP6 = net.ParseIP("2001:0db8::eeee") + testIFName = "teth1" + testIfIP4 = net.ParseIP("10.1.2.3") + testIfIP6 = net.ParseIP("2001:0db8::6666") + testIFName2 = "teth2" + testIf2IP6 = net.ParseIP("2001:0db8::6262") +) + +func TestAgentIP_Any(t *testing.T) { + mockIfaces() + type testCase struct { + dsc string + cfg Config + expect net.IP + } + + for _, tc := range []testCase{ + {dsc: "hardcoding IPv4 address", + cfg: Config{AgentIP: "192.168.1.13"}, + expect: net.IPv4(192, 168, 1, 13)}, + {dsc: "hardcoding IPv6 address", + cfg: Config{AgentIP: "2002:0db9::7336"}, + expect: net.ParseIP("2002:0db9::7336")}, + {dsc: "any local address", + cfg: Config{AgentIPIface: "local", AgentIPType: "any"}, + expect: localIP4}, + {dsc: "local IPv4 address", + cfg: Config{AgentIPIface: "local", AgentIPType: "ipv4"}, + expect: localIP4}, + {dsc: "local IPv6 address", + cfg: Config{AgentIPIface: "local", AgentIPType: "ipv6"}, + expect: localIP6}, + {dsc: "any external address", + cfg: Config{AgentIPIface: "external", AgentIPType: "any"}, + expect: externalIP4}, + {dsc: "external IPv4 address", + cfg: Config{AgentIPIface: "external", AgentIPType: "ipv4"}, + expect: externalIP4}, + {dsc: "external IPv6 address", + cfg: Config{AgentIPIface: "external", AgentIPType: "ipv6"}, + expect: externalIP6}, + {dsc: "any IP given an interface name", + cfg: Config{AgentIPIface: "name:" + testIFName, AgentIPType: "any"}, + expect: testIfIP4}, + {dsc: "IPv4 address given an interface name", + cfg: Config{AgentIPIface: "name:" + testIFName, AgentIPType: "ipv4"}, + expect: testIfIP4}, + {dsc: "IPv6 address given an interface name", + cfg: Config{AgentIPIface: "name:" + testIFName, AgentIPType: "ipv6"}, + expect: testIfIP6}, + {dsc: "any IP given an IPV6-only interface name", + cfg: Config{AgentIPIface: "name:" + testIFName2, AgentIPType: "any"}, + expect: testIf2IP6}, + {dsc: "IPv6 address given an IPV6-only interface name", + cfg: Config{AgentIPIface: "name:" + testIFName2, AgentIPType: "ipv6"}, + expect: testIf2IP6}, + } { + t.Run(tc.dsc, func(t *testing.T) { + ip, err := fetchAgentIP(&tc.cfg) + require.NoError(t, err) + require.Truef(t, tc.expect.Equal(ip), "expected: %s. Got: %s", tc.expect, ip) + }) + } +} + +func mockIfaces() { + // mock local addresses retrieval + interfaceAddrs = func() ([]net.Addr, error) { + return []net.Addr{ + &net.IPNet{IP: net.ParseIP("127.0.0.1")}, + &net.IPNet{IP: localIP4}, + &net.IPNet{IP: localIP6}, + }, nil + } + // mock external address retrieval + dial = func(_, address string) (net.Conn, error) { + // IPv4 address + if regexp.MustCompile(`^\d+(\.\d+){3}(:\d+)?$`).MatchString(address) { + return &connMock{ip: externalIP4}, nil + } + return &connMock{ip: externalIP6}, nil + } + // mock interface retrieval by name + interfaceByName = func(name string) (*net.Interface, error) { + if name != testIFName && name != testIFName2 { + return nil, errors.New("unknown interface " + name) + } + return &net.Interface{ + Name: name, + }, nil + } + // mock test interface address retrieval + ifaceAddrs = func(iface *net.Interface) ([]net.Addr, error) { + switch iface.Name { + case testIFName: + return []net.Addr{ + &net.IPNet{IP: testIfIP4}, + &net.IPNet{IP: testIfIP6}, + }, nil + case testIFName2: + return []net.Addr{ + &net.IPNet{IP: testIf2IP6}, + }, nil + } + return iface.Addrs() + } +} + +type connMock struct { + ip net.IP +} + +func (c *connMock) LocalAddr() net.Addr { + return &net.UDPAddr{IP: c.ip} +} + +func (c *connMock) Read(_ []byte) (n int, err error) { panic("unexpected call") } +func (c *connMock) Write(_ []byte) (n int, err error) { panic("unexpected call") } +func (c *connMock) Close() error { panic("unexpected call") } +func (c *connMock) RemoteAddr() net.Addr { panic("unexpected call") } +func (c *connMock) SetDeadline(_ time.Time) error { panic("unexpected call") } +func (c *connMock) SetReadDeadline(_ time.Time) error { panic("unexpected call") } +func (c *connMock) SetWriteDeadline(_ time.Time) error { panic("unexpected call") } diff --git a/pkg/exporter/grpc_proto_test.go b/pkg/exporter/grpc_proto_test.go new file mode 100644 index 000000000..18f760ec3 --- /dev/null +++ b/pkg/exporter/grpc_proto_test.go @@ -0,0 +1,67 @@ +package exporter + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/mariomac/guara/pkg/test" + "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" + "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const timeout = 2 * time.Second + +func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) { + // start remote ingestor + port, err := test.FreeTCPPort() + require.NoError(t, err) + serverOut := make(chan *pbflow.Records) + _, err = grpc.StartCollector(port, serverOut) + require.NoError(t, err) + + // Start GRPCProto exporter stage + exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port)) + require.NoError(t, err) + + // Send some flows to the input of the exporter stage + flows := make(chan []*flow.Record, 10) + flows <- []*flow.Record{ + {AgentIP: net.ParseIP("10.9.8.7")}, + } + flows <- []*flow.Record{ + {RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{EthProtocol: flow.IPv6Type}}, + AgentIP: net.ParseIP("8888::1111")}, + } + close(flows) + go exporter.ExportFlows(flows) + + var rs *pbflow.Records + select { + case rs = <-serverOut: + case <-time.After(timeout): + require.Fail(t, "timeout waiting for flows") + } + assert.Len(t, rs.Entries, 1) + r := rs.Entries[0] + assert.EqualValues(t, 0x0a090807, r.GetAgentIp().GetIpv4()) + select { + case rs = <-serverOut: + case <-time.After(timeout): + require.Fail(t, "timeout waiting for flows") + } + assert.Len(t, rs.Entries, 1) + r = rs.Entries[0] + assert.EqualValues(t, net.ParseIP("8888::1111"), r.GetAgentIp().GetIpv6()) + + select { + case rs = <-serverOut: + assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs) + default: + //ok! + } +} diff --git a/pkg/exporter/ipfix.go b/pkg/exporter/ipfix.go index 92f3be5ce..c6c33e6db 100644 --- a/pkg/exporter/ipfix.go +++ b/pkg/exporter/ipfix.go @@ -12,6 +12,9 @@ import ( var ilog = logrus.WithField("component", "exporter/IPFIXProto") +// TODO: encode also the equivalent of the Protobuf's AgentIP field in a format that is binary- +// compatible with OVN-K. + type IPFIX struct { hostPort string exporter *ipfixExporter.ExportingProcess diff --git a/pkg/exporter/proto.go b/pkg/exporter/proto.go index 2c86c0162..7a1075e9e 100644 --- a/pkg/exporter/proto.go +++ b/pkg/exporter/proto.go @@ -1,6 +1,9 @@ package exporter import ( + "encoding/binary" + "net" + "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "google.golang.org/protobuf/types/known/timestamppb" @@ -56,6 +59,7 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record { Packets: uint64(fr.Packets), Interface: fr.Interface, Duplicate: fr.Duplicate, + AgentIp: agentIP(fr.AgentIP), } } @@ -88,6 +92,7 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record { Packets: uint64(fr.Packets), Interface: fr.Interface, Duplicate: fr.Duplicate, + AgentIp: agentIP(fr.AgentIP), } } @@ -101,3 +106,11 @@ func macToUint64(m *flow.MacAddr) uint64 { (uint64(m[1]) << 32) | (uint64(m[0]) << 40) } + +func agentIP(nip net.IP) *pbflow.IP { + if ip := nip.To4(); ip != nil { + return &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: binary.BigEndian.Uint32(ip)}} + } + // IPv6 address + return &pbflow.IP{IpFamily: &pbflow.IP_Ipv6{Ipv6: nip}} +} diff --git a/pkg/flow/account.go b/pkg/flow/account.go index f219bf897..59bb36cda 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -16,7 +16,6 @@ type Accounter struct { entries map[RecordKey]*RecordMetrics clock func() time.Time monoClock func() time.Duration - namer InterfaceNamer } var alog = logrus.WithField("component", "flow/Accounter") @@ -24,7 +23,7 @@ var alog = logrus.WithField("component", "flow/Accounter") // NewAccounter creates a new Accounter. // The cache has no limit and it's assumed that eviction is done by the caller. func NewAccounter( - maxEntries int, evictTimeout time.Duration, ifaceNamer InterfaceNamer, + maxEntries int, evictTimeout time.Duration, clock func() time.Time, monoClock func() time.Duration, ) *Accounter { @@ -32,7 +31,6 @@ func NewAccounter( maxEntries: maxEntries, evictTimeout: evictTimeout, entries: map[RecordKey]*RecordMetrics{}, - namer: ifaceNamer, clock: clock, monoClock: monoClock, } @@ -86,7 +84,7 @@ func (c *Accounter) evict(entries map[RecordKey]*RecordMetrics, evictor chan<- [ monotonicNow := uint64(c.monoClock()) records := make([]*Record, 0, len(entries)) for key, metrics := range entries { - records = append(records, NewRecord(key, *metrics, now, monotonicNow, c.namer)) + records = append(records, NewRecord(key, *metrics, now, monotonicNow)) } alog.WithField("numEntries", len(records)).Debug("records evicted from userspace accounter") evictor <- records diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index aea60217d..2bf2bf048 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -37,9 +37,7 @@ var k3 = RecordKey{ func TestEvict_MaxEntries(t *testing.T) { // GIVEN an accounter now := time.Date(2022, 8, 23, 16, 33, 22, 0, time.UTC) - acc := NewAccounter(2, time.Hour, func(ifIndex int) string { - return "foo" - }, func() time.Time { + acc := NewAccounter(2, time.Hour, func() time.Time { return now }, func() time.Duration { return 1000 @@ -103,7 +101,6 @@ func TestEvict_MaxEntries(t *testing.T) { }, TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), - Interface: "foo", }, k2: { RawRecord: RawRecord{ @@ -114,7 +111,6 @@ func TestEvict_MaxEntries(t *testing.T) { }, TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond), - Interface: "foo", }, }, received) } @@ -122,9 +118,7 @@ func TestEvict_MaxEntries(t *testing.T) { func TestEvict_Period(t *testing.T) { // GIVEN an accounter now := time.Date(2022, 8, 23, 16, 33, 22, 0, time.UTC) - acc := NewAccounter(200, 20*time.Millisecond, func(ifIndex int) string { - return "foo" - }, func() time.Time { + acc := NewAccounter(200, 20*time.Millisecond, func() time.Time { return now }, func() time.Duration { return 1000 @@ -182,7 +176,6 @@ func TestEvict_Period(t *testing.T) { EndMonoTimeNs: 789, }, }, - Interface: "foo", TimeFlowStart: now.Add(-1000 + 123), TimeFlowEnd: now.Add(-1000 + 789), }, *records[0]) @@ -198,7 +191,6 @@ func TestEvict_Period(t *testing.T) { EndMonoTimeNs: 1456, }, }, - Interface: "foo", TimeFlowStart: now.Add(-1000 + 1123), TimeFlowEnd: now.Add(-1000 + 1456), }, *records[0]) diff --git a/pkg/flow/decorator.go b/pkg/flow/decorator.go new file mode 100644 index 000000000..1975c6089 --- /dev/null +++ b/pkg/flow/decorator.go @@ -0,0 +1,22 @@ +package flow + +import ( + "net" +) + +type InterfaceNamer func(ifIndex int) string + +// Decorate adds to the flows extra metadata fields that are not directly fetched by eBPF: +// - The interface name (corresponding to the interface index in the flow). +// - The IP address of the agent host. +func Decorate(agentIP net.IP, ifaceNamer InterfaceNamer) func(in <-chan []*Record, out chan<- []*Record) { + return func(in <-chan []*Record, out chan<- []*Record) { + for flows := range in { + for _, flow := range flows { + flow.Interface = ifaceNamer(int(flow.IFIndex)) + flow.AgentIP = agentIP + } + out <- flows + } + } +} diff --git a/pkg/flow/record.go b/pkg/flow/record.go index 625e561cb..665f4ed25 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -91,16 +91,16 @@ type Record struct { // "exclude from aggregation". Otherwise rates, sums, etc... values would be multiplied by the // number of interfaces this flow is observed from. Duplicate bool -} -type InterfaceNamer func(ifIndex int) string + // AgentIP provides information about the source of the flow (the Agent that traced it) + AgentIP net.IP +} func NewRecord( key RecordKey, metrics RecordMetrics, currentTime time.Time, monotonicCurrentTime uint64, - namer InterfaceNamer, ) *Record { startDelta := time.Duration(monotonicCurrentTime - metrics.StartMonoTimeNs) endDelta := time.Duration(monotonicCurrentTime - metrics.EndMonoTimeNs) @@ -109,7 +109,6 @@ func NewRecord( RecordKey: key, RecordMetrics: metrics, }, - Interface: namer(int(key.IFIndex)), TimeFlowStart: currentTime.Add(-startDelta), TimeFlowEnd: currentTime.Add(-endDelta), } diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index 510bdea46..8f58eb320 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -16,7 +16,6 @@ var mtlog = logrus.WithField("component", "flow.MapTracer") // a flow Record structure, and performs the accumulation of each perCPU-record into a single flow type MapTracer struct { mapFetcher mapFetcher - interfaceNamer InterfaceNamer evictionTimeout time.Duration // manages the access to the eviction routines, avoiding two evictions happening at the same time evictionCond *sync.Cond @@ -27,10 +26,9 @@ type mapFetcher interface { LookupAndDeleteMap() map[RecordKey][]RecordMetrics } -func NewMapTracer(fetcher mapFetcher, namer InterfaceNamer, evictionTimeout time.Duration) *MapTracer { +func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer { return &MapTracer{ mapFetcher: fetcher, - interfaceNamer: namer, evictionTimeout: evictionTimeout, lastEvictionNs: uint64(monotime.Now()), evictionCond: sync.NewCond(&sync.Mutex{}), @@ -108,7 +106,6 @@ func (m *MapTracer) evictFlows(forwardFlows chan<- []*Record) { aggregatedMetrics, currentTime, uint64(monotonicTimeNow), - m.interfaceNamer, )) } m.lastEvictionNs = laterFlowNs diff --git a/pkg/grpc/grpc_test.go b/pkg/grpc/grpc_test.go index 8bd094f53..fef4b3495 100644 --- a/pkg/grpc/grpc_test.go +++ b/pkg/grpc/grpc_test.go @@ -36,6 +36,8 @@ func TestGRPCCommunication(t *testing.T) { DstAddr: &pbflow.IP{ IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788}, }, + }, AgentIp: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0xaabbccdd}, }}}, }) require.NoError(t, err) @@ -48,6 +50,8 @@ func TestGRPCCommunication(t *testing.T) { DstAddr: &pbflow.IP{ IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x88776655}, }, + }, AgentIp: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0xddccbbaa}, }}}, }) require.NoError(t, err) @@ -65,6 +69,7 @@ func TestGRPCCommunication(t *testing.T) { assert.EqualValues(t, 456, r.Bytes) assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4()) assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4()) + assert.EqualValues(t, 0xaabbccdd, r.GetAgentIp().GetIpv4()) select { case rs = <-serverOut: case <-time.After(timeout): @@ -76,6 +81,7 @@ func TestGRPCCommunication(t *testing.T) { assert.EqualValues(t, 101, r.Bytes) assert.EqualValues(t, 0x44332211, r.GetNetwork().GetSrcAddr().GetIpv4()) assert.EqualValues(t, 0x88776655, r.GetNetwork().GetDstAddr().GetIpv4()) + assert.EqualValues(t, 0xddccbbaa, r.GetAgentIp().GetIpv4()) select { case rs = <-serverOut: diff --git a/pkg/pbflow/flow.pb.go b/pkg/pbflow/flow.pb.go index 27342aadd..4458513eb 100644 --- a/pkg/pbflow/flow.pb.go +++ b/pkg/pbflow/flow.pb.go @@ -176,6 +176,8 @@ type Record struct { // if true, the same flow has been recorded via another interface. // From all the duplicate flows, one will set this value to false and the rest will be true. Duplicate bool `protobuf:"varint,11,opt,name=duplicate,proto3" json:"duplicate,omitempty"` + // Agent IP address to help identifying the source of the flow + AgentIp *IP `protobuf:"bytes,12,opt,name=agent_ip,json=agentIp,proto3" json:"agent_ip,omitempty"` } func (x *Record) Reset() { @@ -287,6 +289,13 @@ func (x *Record) GetDuplicate() bool { return false } +func (x *Record) GetAgentIp() *IP { + if x != nil { + return x.AgentIp + } + return nil +} + type DataLink struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -553,7 +562,7 @@ var file_proto_flow_proto_rawDesc = []byte{ 0x07, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x28, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, - 0x65, 0x73, 0x22, 0xd7, 0x03, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a, + 0x65, 0x73, 0x22, 0xfe, 0x03, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x65, 0x74, 0x68, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, @@ -582,34 +591,37 @@ var file_proto_flow_proto_rawDesc = []byte{ 0x12, 0x1c, 0x0a, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x22, 0x3c, 0x0a, 0x08, - 0x44, 0x61, 0x74, 0x61, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x5f, - 0x6d, 0x61, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x73, 0x72, 0x63, 0x4d, 0x61, - 0x63, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x5f, 0x6d, 0x61, 0x63, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x06, 0x64, 0x73, 0x74, 0x4d, 0x61, 0x63, 0x22, 0x57, 0x0a, 0x07, 0x4e, 0x65, - 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x25, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x61, 0x64, 0x64, - 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, - 0x2e, 0x49, 0x50, 0x52, 0x07, 0x73, 0x72, 0x63, 0x41, 0x64, 0x64, 0x72, 0x12, 0x25, 0x0a, 0x08, - 0x64, 0x73, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, - 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x64, 0x73, 0x74, 0x41, - 0x64, 0x64, 0x72, 0x22, 0x3d, 0x0a, 0x02, 0x49, 0x50, 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, - 0x34, 0x18, 0x01, 0x20, 0x01, 0x28, 0x07, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, 0x76, 0x34, 0x12, - 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x36, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, - 0x04, 0x69, 0x70, 0x76, 0x36, 0x42, 0x0b, 0x0a, 0x09, 0x69, 0x70, 0x5f, 0x66, 0x61, 0x6d, 0x69, - 0x6c, 0x79, 0x22, 0x5d, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, - 0x19, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x73, - 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x64, 0x73, - 0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, - 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, - 0x6c, 0x2a, 0x24, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, - 0x0a, 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x45, - 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x32, 0x3e, 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, - 0x63, 0x74, 0x6f, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x0f, 0x2e, 0x70, - 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x16, 0x2e, - 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, - 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x62, 0x66, - 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x08, 0x52, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x08, + 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, + 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, + 0x74, 0x49, 0x70, 0x22, 0x3c, 0x0a, 0x08, 0x44, 0x61, 0x74, 0x61, 0x4c, 0x69, 0x6e, 0x6b, 0x12, + 0x17, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x5f, 0x6d, 0x61, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x06, 0x73, 0x72, 0x63, 0x4d, 0x61, 0x63, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x5f, + 0x6d, 0x61, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x64, 0x73, 0x74, 0x4d, 0x61, + 0x63, 0x22, 0x57, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x25, 0x0a, 0x08, + 0x73, 0x72, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, + 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x73, 0x72, 0x63, 0x41, + 0x64, 0x64, 0x72, 0x12, 0x25, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, + 0x50, 0x52, 0x07, 0x64, 0x73, 0x74, 0x41, 0x64, 0x64, 0x72, 0x22, 0x3d, 0x0a, 0x02, 0x49, 0x50, + 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x34, 0x18, 0x01, 0x20, 0x01, 0x28, 0x07, 0x48, 0x00, + 0x52, 0x04, 0x69, 0x70, 0x76, 0x34, 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x36, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, 0x76, 0x36, 0x42, 0x0b, 0x0a, 0x09, + 0x69, 0x70, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x22, 0x5d, 0x0a, 0x09, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x70, 0x6f, + 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50, 0x6f, 0x72, + 0x74, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2a, 0x24, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, + 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x32, 0x3e, + 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x53, + 0x65, 0x6e, 0x64, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, + 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x16, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x43, 0x6f, + 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, + 0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -645,15 +657,16 @@ var file_proto_flow_proto_depIdxs = []int32{ 4, // 4: pbflow.Record.data_link:type_name -> pbflow.DataLink 5, // 5: pbflow.Record.network:type_name -> pbflow.Network 7, // 6: pbflow.Record.transport:type_name -> pbflow.Transport - 6, // 7: pbflow.Network.src_addr:type_name -> pbflow.IP - 6, // 8: pbflow.Network.dst_addr:type_name -> pbflow.IP - 2, // 9: pbflow.Collector.Send:input_type -> pbflow.Records - 1, // 10: pbflow.Collector.Send:output_type -> pbflow.CollectorReply - 10, // [10:11] is the sub-list for method output_type - 9, // [9:10] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 6, // 7: pbflow.Record.agent_ip:type_name -> pbflow.IP + 6, // 8: pbflow.Network.src_addr:type_name -> pbflow.IP + 6, // 9: pbflow.Network.dst_addr:type_name -> pbflow.IP + 2, // 10: pbflow.Collector.Send:input_type -> pbflow.Records + 1, // 11: pbflow.Collector.Send:output_type -> pbflow.CollectorReply + 11, // [11:12] is the sub-list for method output_type + 10, // [10:11] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_proto_flow_proto_init() } diff --git a/pkg/test/export_fake.go b/pkg/test/export_fake.go index 34c8b1304..00412f108 100644 --- a/pkg/test/export_fake.go +++ b/pkg/test/export_fake.go @@ -26,6 +26,7 @@ func (ef *ExporterFake) Export(in <-chan []*flow.Record) { } func (ef *ExporterFake) Get(t *testing.T, timeout time.Duration) []*flow.Record { + t.Helper() select { case <-time.After(timeout): t.Fatalf("timeout %s while waiting for a message to be exported", timeout) diff --git a/proto/flow.proto b/proto/flow.proto index 6c34dabed..8b5994eec 100644 --- a/proto/flow.proto +++ b/proto/flow.proto @@ -37,6 +37,9 @@ message Record { // if true, the same flow has been recorded via another interface. // From all the duplicate flows, one will set this value to false and the rest will be true. bool duplicate = 11; + + // Agent IP address to help identifying the source of the flow + IP agent_ip = 12; } message DataLink {