Skip to content

Commit

Permalink
add tcp flow stream exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Nov 27, 2023
1 parent 0c43b4a commit ce87754
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 1 deletion.
14 changes: 14 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
return buildIPFIXExporter(cfg, "tcp")
case "direct-flp":
return buildDirectFLPExporter(cfg)
case "tcp":
return buildFlowStreamExporter(cfg)
default:
return nil, fmt.Errorf("wrong export type %s. Admitted values are grpc, kafka", cfg.Export)
}
Expand Down Expand Up @@ -339,6 +341,18 @@ func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*flow.Re
return ipfix.ExportFlows, nil
}

func buildFlowStreamExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing port: %d", cfg.TargetPort)
}
flowStreamer, err := exporter.StartFlowSend(fmt.Sprintf("%d", cfg.TargetPort))
if err != nil {
return nil, err
}

return flowStreamer.ExportFlows, err
}

// Run a Flows agent. The function will keep running in the same thread
// until the passed context is canceled
func (f *Flows) Run(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Config struct {
// If the AgentIP configuration property is set, this property has no effect.
AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"`
// Export selects the flows' exporter protocol. Accepted values are: grpc (default), kafka,
// ipfix+udp, ipfix+tcp or direct-flp.
// ipfix+udp, ipfix+tcp, tcp or direct-flp.
Export string `env:"EXPORT" envDefault:"grpc"`
// TargetHost is the host name or IP of the target Flow collector, when the EXPORT variable is
// set to "grpc"
Expand Down
66 changes: 66 additions & 0 deletions pkg/exporter/flows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package exporter

import (
"encoding/json"
"net"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-ebpf-agent/pkg/decode"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
)

type FlowStream struct {
hostPort string
clientConn net.Conn
}

// WriteFlow writes the given flow data out to the file.
func writeFlow(record *config.GenericMap, conn net.Conn) error {
b, err := json.Marshal(record)
if err != nil {
plog.Fatal(err)
}
// append new line between each record to split on client side
b = append(b, []byte("\n")...)
_, err = conn.Write(b)
if err != nil {
plog.Fatal(err)
}
return err
}

// FIXME: Only after client connects to it, the agent starts collecting and sending flows.
// This behavior needs to be fixed.
func StartFlowSend(hostPort string) (*FlowStream, error) {
PORT := ":" + hostPort
l, err := net.Listen("tcp", PORT)
if err != nil {
return nil, err
}
defer l.Close()
clientConn, err := l.Accept()

if err != nil {
return nil, err
}

return &FlowStream{
hostPort: hostPort,
clientConn: clientConn,
}, nil
}

func (p *FlowStream) ExportFlows(in <-chan []*flow.Record) {
//Create handler by opening Flow stream
for flowRecord := range in {
if len(flowRecord) > 0 {
for _, record := range flowRecord {
genericMap := decode.PBFlowToMap(flowToPB(record))
err := writeFlow(&genericMap, p.clientConn)
if err != nil {
plog.Fatal(err)
}
}
}
}
}

0 comments on commit ce87754

Please sign in to comment.