Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1471 gRPC export for packet capture #291

Merged
merged 13 commits into from
Mar 22, 2024
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ generate: prereqs ## Generate artifacts of the code repo (pkg/ebpf and pkg/proto
go generate ./pkg/...
@echo "### Generating gRPC and Protocol Buffers code"
PATH="$(shell pwd)/protoc/bin:$$PATH" protoc --go_out=pkg --go-grpc_out=pkg proto/flow.proto
PATH="$(shell pwd)/protoc/bin:$$PATH" protoc --go_out=pkg --go-grpc_out=pkg proto/packet.proto

.PHONY: docker-generate
docker-generate: ## Create the container that generates the eBPF binaries
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ configured by our [Network Observability Operator](https://github.com/netobserv/
Anyway you can run it directly as an executable from your command line:

```
export FLOWS_TARGET_HOST=...
export FLOWS_TARGET_PORT=...
export HOST=...
export PORT=...
sudo -E bin/netobserv-ebpf-agent
```

Expand Down
4 changes: 2 additions & 2 deletions deployments/flp-daemonset-cap.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ spec:
- SYS_RESOURCE
runAsUser: 0
env:
- name: FLOWS_TARGET_HOST
- name: HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT
- name: PORT
value: "9999"
---
apiVersion: apps/v1
Expand Down
4 changes: 2 additions & 2 deletions deployments/flp-daemonset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ spec:
privileged: true
runAsUser: 0
env:
- name: FLOWS_TARGET_HOST
- name: HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT
- name: PORT
value: "9999"
---
apiVersion: apps/v1
Expand Down
4 changes: 2 additions & 2 deletions deployments/flp-service.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ spec:
privileged: true
runAsUser: 0
env:
- name: FLOWS_TARGET_HOST
- name: HOST
value: "flp"
- name: FLOWS_TARGET_PORT
- name: PORT
value: "9999"
---
apiVersion: v1
Expand Down
4 changes: 2 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ _Please also refer to the file [config.go](../pkg/agent/config.go) which is the
The following environment variables are available to configure the NetObserv eBFP Agent:

* `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc`, `kafka`, `ipfix+udp`, `ipfix+tcp` or `direct-flp`. In `direct-flp` mode, [flowlogs-pipeline](https://github.com/netobserv/flowlogs-pipeline) is run internally from the agent, allowing more filtering, transformations and exporting options.
* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector.
* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector.
* `HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the Flow collector.
* `PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the flow collector.
* `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC
message. Messages larger than that number will be split and submitted sequentially.
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
Expand Down
4 changes: 2 additions & 2 deletions e2e/cluster/base/04-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ spec:
value: 200ms
- name: LOG_LEVEL
value: debug
- name: FLOWS_TARGET_HOST
- name: HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT
- name: PORT
value: "9999"
volumeMounts:
- name: bpf-kernel-debug
Expand Down
4 changes: 2 additions & 2 deletions e2e/ipfix/manifests/30-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ spec:
value: 200ms
- name: LOG_LEVEL
value: debug
- name: FLOWS_TARGET_HOST
- name: HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT
- name: PORT
value: "9999"
volumeMounts:
- name: bpf-kernel-debug
Expand Down
2 changes: 1 addition & 1 deletion examples/flowlogs-dump/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ go build -mod vendor -o bin/flowlogs-dump-collector examples/flowlogs-dump/serve
```
Start the agent using:
```bash
sudo FLOWS_TARGET_HOST=127.0.0.1 FLOWS_TARGET_PORT=9999 ./bin/netobserv-ebpf-agent
sudo HOST=127.0.0.1 PORT=9999 ./bin/netobserv-ebpf-agent
```

Start the flowlogs-dump-collector using: (in a secondary shell)
Expand Down
2 changes: 1 addition & 1 deletion examples/flowlogs-dump/server/flowlogs-dump-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"log"
"net"

"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
)

Expand Down
24 changes: 12 additions & 12 deletions examples/packetcapture-dump/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# packetcapture-client
# Packet Capture TCP Client

## How to run

Expand All @@ -12,14 +12,14 @@ Build the packetcapture-dump-collector (the client that receives full packets fr
```bash
go build -mod vendor -o bin/packetcapture-client examples/packetcapture-dump/client/packetcapture-client.go
```
Start the agent using:
Start the packetcapture-client using: (in a secondary shell)
```bash
sudo PCA_SERVER_PORT=9990 ENABLE_PCA=true PCA_FILTER=tcp,22 ./bin/netobserv-ebpf-agent
./bin/packetcapture-client -outfile=capture.pcap
```

Start the packetcapture-client using: (in a secondary shell)
Start the agent using:
```bash
./bin/packetcapture-client -outfile=capture.pcap
sudo HOST=localhost PORT=9990 ENABLE_PCA=true PCA_FILTER=tcp,22 ./bin/netobserv-ebpf-agent
```

You should see output such as:
Expand All @@ -29,13 +29,13 @@ By default, the read packets are printed on stdout.
To write to a pcap file use flag '-outfile=[filename]'
This creates a file [filename] and writes packets to it.
To view captured packets 'tcpdump -r [filename]'.

07-24-2023 07:58:59.264323 : Received Packet of length 24
07-24-2023 07:59:04.268965 : Received Packet of length 410
07-24-2023 07:59:04.269048 : Received Packet of length 644
07-24-2023 07:59:04.269087 : Received Packet of length 224
07-24-2023 07:59:04.269125 : Received Packet of length 82
07-24-2023 07:59:04.269173 : Received Packet of length 148
writting into capture.pcap
03-22-2024 10:48:44.941828 : Received Packet of length 136
03-22-2024 10:48:44.942901 : Received Packet of length 106
03-22-2024 10:48:44.943597 : Received Packet of length 110
03-22-2024 10:48:44.944182 : Received Packet of length 70
03-22-2024 10:48:44.944447 : Received Packet of length 70
03-22-2024 10:48:44.944644 : Received Packet of length 138
...
```

Expand Down
63 changes: 32 additions & 31 deletions examples/packetcapture-dump/client/packetcapture-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,24 @@ package main
import (
"flag"
"fmt"
"net"
"os"
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"

"github.com/google/gopacket/layers"
)

var (
PORT = flag.String("connect_port", "9990", "TCP port to connect to for packet stream")
HOST = flag.String("connect_host", "localhost", "Packet Capture Agent IP")
FILENAME = flag.String("outfile", "", "Create and write to Filename.pcap")
PORT = flag.Int("port", 9990, "gRPC collector port for packet stream")
FILENAME = flag.String("outfile", "", "Create and write to <Filename>.pcap")
)

// Setting Snapshot length to 0 sets it to maximum packet size
var snapshotlen uint32

func check(e error) {
if e != nil {
panic(e)
Expand All @@ -45,47 +52,41 @@ func main() {
fmt.Println("To view captured packets 'tcpdump -r [filename]'.")
flag.Parse()

tcpServer, err := net.ResolveTCPAddr("tcp", *HOST+":"+*PORT)

if err != nil {
println("ResolveTCPAddr failed:", err.Error())
os.Exit(1)
}
conn, err := net.DialTCP("tcp", nil, tcpServer)
flowPackets := make(chan *pbpacket.Packet, 100)
collector, err := grpc.StartCollector(*PORT, flowPackets)
if err != nil {
println("Dial failed:", err.Error())
fmt.Println("StartCollector failed:", err.Error())
os.Exit(1)
}

var f *os.File
if *FILENAME != "" {
f, err = os.Create(*FILENAME)
if err != nil {
fmt.Println("Create file failed:", err.Error())
os.Exit(1)
}
// write pcap file header
_, err = f.Write(exporter.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
if err != nil {
fmt.Println("Write file header failed:", err.Error())
os.Exit(1)
}
fmt.Println("writting into", *FILENAME)

defer f.Close()
for {
received := make([]byte, 65535)
n, err := conn.Read(received)
if err != nil {
println("Read data failed:", err.Error())
os.Exit(1)
}
_, err = f.Write(received[:n])
for fp := range flowPackets {
_, err = f.Write(fp.Pcap.Value)
check(err)
dt := time.Now()
fmt.Println(dt.Format("01-02-2006 15:04:05.000000"), ": Received Packet of length ", n)
fmt.Println(dt.Format("01-02-2006 15:04:05.000000"), ": Received Packet of length ", len(fp.Pcap.Value))
}
} else {
fmt.Println("into else")
for {
received := make([]byte, 65535)
n, err := conn.Read(received)
if err != nil {
println("Read data failed:", err.Error())
os.Exit(1)
}
fmt.Println(received[:n])
fmt.Println("printing stdout without saving in file")

for fp := range flowPackets {
fmt.Println(fp.Pcap.Value)
}
}
conn.Close()
collector.Close()
}
4 changes: 2 additions & 2 deletions examples/performance/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ spec:
value: call_error,cares_resolver,dns_resolver
- name: GRPC_DNS_RESOLVER
value: "ares"
- name: FLOWS_TARGET_HOST
- name: HOST
value: "packet-counter"
- name: FLOWS_TARGET_PORT
- name: PORT
value: "9999"
# resources:
# limits:
Expand Down
2 changes: 1 addition & 1 deletion examples/performance/server/packet-counter-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/paulbellamy/ratecounter"
)
Expand Down
4 changes: 2 additions & 2 deletions examples/systemd/netobserv-ebpf-agent
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# /etc/default/netobserv-ebpf-agent

DIRECTION=both
FLOWS_TARGET_HOST=127.0.0.1
FLOWS_TARGET_PORT=9999
HOST=127.0.0.1
PORT=9999
17 changes: 10 additions & 7 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type ebpfFlowFetcher interface {
func FlowsAgent(cfg *Config) (*Flows, error) {
alog.Info("initializing Flows agent")

// manage deprecated configs
manageDeprecatedConfigs(cfg)

// configure informer for new interfaces
var informer = configureInformer(cfg, alog)

Expand Down Expand Up @@ -286,16 +289,16 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*fl
case "direct-flp":
return buildDirectFLPExporter(cfg)
default:
return nil, fmt.Errorf("wrong export type %s. Admitted values are grpc, kafka", cfg.Export)
return nil, fmt.Errorf("wrong flow export type %s", cfg.Export)
}
}

func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
if cfg.Host == "" || cfg.Port == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
cfg.Host, cfg.Port)
}
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m)
grpcExporter, err := exporter.StartGRPCProto(cfg.Host, cfg.Port, cfg.GRPCMessageMaxFlows, m)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -361,11 +364,11 @@ func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*f
}

func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
if cfg.Host == "" || cfg.Port == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
cfg.Host, cfg.Port)
}
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, proto)
ipfix, err := exporter.StartIPFIXExporter(cfg.Host, cfg.Port, proto)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func TestFlowsAgent_InvalidConfigs(t *testing.T) {
c: Config{Export: "foo"},
}, {
d: "GRPC: missing host",
c: Config{Export: "grpc", TargetPort: 3333},
c: Config{Export: "grpc", Port: 3333},
}, {
d: "GRPC: missing port",
c: Config{Export: "grpc", TargetHost: "flp"},
c: Config{Export: "grpc", Host: "flp"},
}, {
d: "Kafka: missing brokers",
c: Config{Export: "kafka"},
Expand Down
Loading
Loading