diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index cfab9ffce..759143f40 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -253,6 +253,8 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) { return buildIPFIXExporter(cfg, "tcp") case "direct-flp": return buildDirectFLPExporter(cfg) + case "tcp": + return buildFlowStreamExporter(cfg) default: return nil, fmt.Errorf("wrong export type %s. Admitted values are grpc, kafka", cfg.Export) } @@ -339,6 +341,18 @@ func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*flow.Re return ipfix.ExportFlows, nil } +func buildFlowStreamExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) { + if cfg.TargetPort == 0 { + return nil, fmt.Errorf("missing port: %d", cfg.TargetPort) + } + flowStreamer, err := exporter.StartFlowSend(fmt.Sprintf("%d", cfg.TargetPort)) + if err != nil { + return nil, err + } + + return flowStreamer.ExportFlows, err +} + // Run a Flows agent. The function will keep running in the same thread // until the passed context is canceled func (f *Flows) Run(ctx context.Context) error { diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 63edce4d3..58a280069 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -35,7 +35,7 @@ type Config struct { // If the AgentIP configuration property is set, this property has no effect. AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"` // Export selects the flows' exporter protocol. Accepted values are: grpc (default), kafka, - // ipfix+udp, ipfix+tcp or direct-flp. + // ipfix+udp, ipfix+tcp, tcp or direct-flp. Export string `env:"EXPORT" envDefault:"grpc"` // TargetHost is the host name or IP of the target Flow collector, when the EXPORT variable is // set to "grpc" diff --git a/pkg/exporter/flows.go b/pkg/exporter/flows.go new file mode 100644 index 000000000..d409b6564 --- /dev/null +++ b/pkg/exporter/flows.go @@ -0,0 +1,66 @@ +package exporter + +import ( + "encoding/json" + "net" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/netobserv-ebpf-agent/pkg/decode" + "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" +) + +type FlowStream struct { + hostPort string + clientConn net.Conn +} + +// WriteFlow writes the given flow data out to the file. +func writeFlow(record *config.GenericMap, conn net.Conn) error { + b, err := json.Marshal(record) + if err != nil { + plog.Fatal(err) + } + // append new line between each record to split on client side + b = append(b, []byte("\n")...) + _, err = conn.Write(b) + if err != nil { + plog.Fatal(err) + } + return err +} + +// FIXME: Only after client connects to it, the agent starts collecting and sending flows. +// This behavior needs to be fixed. +func StartFlowSend(hostPort string) (*FlowStream, error) { + PORT := ":" + hostPort + l, err := net.Listen("tcp", PORT) + if err != nil { + return nil, err + } + defer l.Close() + clientConn, err := l.Accept() + + if err != nil { + return nil, err + } + + return &FlowStream{ + hostPort: hostPort, + clientConn: clientConn, + }, nil +} + +func (p *FlowStream) ExportFlows(in <-chan []*flow.Record) { + //Create handler by opening Flow stream + for flowRecord := range in { + if len(flowRecord) > 0 { + for _, record := range flowRecord { + genericMap := decode.PBFlowToMap(flowToPB(record)) + err := writeFlow(&genericMap, p.clientConn) + if err != nil { + plog.Fatal(err) + } + } + } + } +}