-
Notifications
You must be signed in to change notification settings - Fork 32
/
grpc_proto.go
74 lines (66 loc) · 2.78 KB
/
grpc_proto.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package exporter
import (
"context"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
var glog = logrus.WithField("component", "exporter/GRPCProto")
const componentGRPC = "grpc"
// GRPCProto flow exporter. Its ExportFlows method accepts slices of *flow.Record
// by its input channel, converts them to *pbflow.Records instances, and submits
// them to the collector.
type GRPCProto struct {
hostIP string
hostPort int
clientConn *grpc.ClientConnection
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
// If a message contains more flows than this number, the GRPC message will be split into
// multiple messages.
maxFlowsPerMessage int
metrics *metrics.Metrics
batchCounter prometheus.Counter
sampler *ovnobserv.SampleDecoder
}
func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort)
if err != nil {
return nil, err
}
return &GRPCProto{
hostIP: hostIP,
hostPort: hostPort,
clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
metrics: m,
batchCounter: m.CreateBatchCounter(componentGRPC),
sampler: s,
}, nil
}
// ExportFlows accepts slices of *flow.Record by its input channel, converts them
// to *pbflow.Records instances, and submits them to the collector.
func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
socket := utils.GetSocket(g.hostIP, g.hostPort)
log := glog.WithField("collector", socket)
for inputRecords := range input {
g.metrics.EvictionCounter.WithSource(componentGRPC).Inc()
for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage, g.sampler) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage").Inc()
log.WithError(err).Error("couldn't send flow records to collector")
}
g.batchCounter.Inc()
g.metrics.EvictedFlowsCounter.WithSource(componentGRPC).Add(float64(len(pbRecords.Entries)))
}
}
if err := g.clientConn.Close(); err != nil {
log.WithError(err).Warn("couldn't close flow export client")
g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient").Inc()
}
}