diff --git a/pkg/systemstatsmonitor/net_collector.go b/pkg/systemstatsmonitor/net_collector.go index d7447b66d..4fd9cbc45 100644 --- a/pkg/systemstatsmonitor/net_collector.go +++ b/pkg/systemstatsmonitor/net_collector.go @@ -17,267 +17,196 @@ limitations under the License. package systemstatsmonitor import ( - "github.com/golang/glog" - "github.com/prometheus/procfs" + "fmt" + ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types" "k8s.io/node-problem-detector/pkg/util/metrics" + + "github.com/golang/glog" + "github.com/prometheus/procfs" ) type netCollector struct { - mNetDevRxBytes *metrics.Int64Metric - mNetDevRxPackets *metrics.Int64Metric - mNetDevRxErrors *metrics.Int64Metric - mNetDevRxDropped *metrics.Int64Metric - mNetDevRxFifo *metrics.Int64Metric - mNetDevRxFrame *metrics.Int64Metric - mNetDevRxCompressed *metrics.Int64Metric - mNetDevRxMulticast *metrics.Int64Metric - mNetDevTxBytes *metrics.Int64Metric - mNetDevTxPackets *metrics.Int64Metric - mNetDevTxErrors *metrics.Int64Metric - mNetDevTxDropped *metrics.Int64Metric - mNetDevTxFifo *metrics.Int64Metric - mNetDevTxCollisions *metrics.Int64Metric - mNetDevTxCarrier *metrics.Int64Metric - mNetDevTxCompressed *metrics.Int64Metric - - config *ssmtypes.NetStatsConfig + config *ssmtypes.NetStatsConfig + recorder *ifaceStatRecorder } func NewNetCollectorOrDie(netConfig *ssmtypes.NetStatsConfig) *netCollector { - nc := netCollector{config: netConfig} - - var err error + nc := netCollector{ + config: netConfig, + recorder: newIfaceStatRecorder(), + } - nc.mNetDevRxBytes, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevRxBytes, - netConfig.MetricsConfigs[string(metrics.NetDevRxBytes)].DisplayName, "Cumulative count of bytes received.", "Byte", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevRxBytes, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.RxBytes) + }, + ) - nc.mNetDevRxPackets, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevRxPackets, - netConfig.MetricsConfigs[string(metrics.NetDevRxPackets)].DisplayName, "Cumulative count of packets received.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevRxPackets, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.RxPackets) + }, + ) - nc.mNetDevRxErrors, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevRxErrors, - netConfig.MetricsConfigs[string(metrics.NetDevRxErrors)].DisplayName, "Cumulative count of receive errors encountered.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevRxErrors, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.RxErrors) + }, + ) - nc.mNetDevRxDropped, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevRxDropped, - netConfig.MetricsConfigs[string(metrics.NetDevRxDropped)].DisplayName, "Cumulative count of packets dropped while receiving.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevRxDropped, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.RxDropped) + }, + ) - nc.mNetDevRxFifo, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevRxFifo, - netConfig.MetricsConfigs[string(metrics.NetDevRxFifo)].DisplayName, "Cumulative count of FIFO buffer errors.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevRxFifo, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.RxFIFO) + }, + ) - nc.mNetDevRxFrame, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevRxFrame, - netConfig.MetricsConfigs[string(metrics.NetDevRxFrame)].DisplayName, "Cumulative count of packet framing errors.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevRxFrame, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.RxFrame) + }, + ) - nc.mNetDevRxCompressed, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevRxCompressed, - netConfig.MetricsConfigs[string(metrics.NetDevRxCompressed)].DisplayName, "Cumulative count of compressed packets received by the device driver.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevRxCompressed, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.RxCompressed) + }, + ) - nc.mNetDevRxMulticast, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevRxMulticast, - netConfig.MetricsConfigs[string(metrics.NetDevRxMulticast)].DisplayName, "Cumulative count of multicast frames received by the device driver.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevRxMulticast, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.RxMulticast) + }, + ) - nc.mNetDevTxBytes, err = metrics.NewInt64Metric( + nc.mustRegisterMetric( metrics.NetDevTxBytes, - netConfig.MetricsConfigs[string(metrics.NetDevTxBytes)].DisplayName, "Cumulative count of bytes transmitted.", "Byte", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevTxBytes, err) - } - - nc.mNetDevTxPackets, err = metrics.NewInt64Metric( + func(stat procfs.NetDevLine) int64 { + return int64(stat.TxBytes) + }, + ) + nc.mustRegisterMetric( metrics.NetDevTxPackets, - netConfig.MetricsConfigs[string(metrics.NetDevTxPackets)].DisplayName, "Cumulative count of packets transmitted.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevTxPackets, err) - } - - nc.mNetDevTxErrors, err = metrics.NewInt64Metric( + func(stat procfs.NetDevLine) int64 { + return int64(stat.TxPackets) + }, + ) + nc.mustRegisterMetric( metrics.NetDevTxErrors, - netConfig.MetricsConfigs[string(metrics.NetDevTxErrors)].DisplayName, "Cumulative count of transmit errors encountered.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevTxErrors, err) - } - - nc.mNetDevTxDropped, err = metrics.NewInt64Metric( + func(stat procfs.NetDevLine) int64 { + return int64(stat.TxErrors) + }, + ) + nc.mustRegisterMetric( metrics.NetDevTxDropped, - netConfig.MetricsConfigs[string(metrics.NetDevTxDropped)].DisplayName, "Cumulative count of packets dropped while transmitting.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevTxDropped, err) - } - - nc.mNetDevTxFifo, err = metrics.NewInt64Metric( + func(stat procfs.NetDevLine) int64 { + return int64(stat.TxDropped) + }, + ) + nc.mustRegisterMetric( metrics.NetDevTxFifo, - netConfig.MetricsConfigs[string(metrics.NetDevTxFifo)].DisplayName, "Cumulative count of FIFO buffer errors.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevTxFifo, err) - } - - nc.mNetDevTxCollisions, err = metrics.NewInt64Metric( + func(stat procfs.NetDevLine) int64 { + return int64(stat.TxFIFO) + }, + ) + nc.mustRegisterMetric( metrics.NetDevTxCollisions, - netConfig.MetricsConfigs[string(metrics.NetDevTxCollisions)].DisplayName, "Cumulative count of collisions detected on the interface.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevTxCollisions, err) - } - - nc.mNetDevTxCarrier, err = metrics.NewInt64Metric( + func(stat procfs.NetDevLine) int64 { + return int64(stat.TxCollisions) + }, + ) + nc.mustRegisterMetric( metrics.NetDevTxCarrier, - netConfig.MetricsConfigs[string(metrics.NetDevTxCarrier)].DisplayName, "Cumulative count of carrier losses detected by the device driver.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevTxCarrier, err) - } - - nc.mNetDevTxCompressed, err = metrics.NewInt64Metric( + func(stat procfs.NetDevLine) int64 { + return int64(stat.TxCarrier) + }, + ) + nc.mustRegisterMetric( metrics.NetDevTxCompressed, - netConfig.MetricsConfigs[string(metrics.NetDevTxCompressed)].DisplayName, "Cumulative count of compressed packets transmitted by the device driver.", "1", metrics.Sum, - []string{interfaceNameLabel}) - if err != nil { - glog.Fatalf("Error initializing metric for %q: %v", metrics.NetDevTxCompressed, err) - } + func(stat procfs.NetDevLine) int64 { + return int64(stat.TxCompressed) + }, + ) return &nc } -func (nc *netCollector) recordNetDev() { - if nc.mNetDevRxBytes == nil { - return - } - if nc.mNetDevRxPackets == nil { - return - } - if nc.mNetDevRxErrors == nil { - return - } - if nc.mNetDevRxDropped == nil { - return - } - if nc.mNetDevRxFifo == nil { - return - } - if nc.mNetDevRxFrame == nil { - return - } - if nc.mNetDevRxCompressed == nil { - return - } - if nc.mNetDevRxMulticast == nil { - return - } - if nc.mNetDevTxBytes == nil { - return - } - if nc.mNetDevTxPackets == nil { - return - } - if nc.mNetDevTxErrors == nil { - return - } - if nc.mNetDevTxDropped == nil { - return - } - if nc.mNetDevTxFifo == nil { - return - } - if nc.mNetDevTxCollisions == nil { - return +func (nc *netCollector) mustRegisterMetric(metricID metrics.MetricID, description, unit string, + aggregation metrics.Aggregation, exporter func(stat procfs.NetDevLine) int64) { + metricConfig, ok := nc.config.MetricsConfigs[string(metricID)] + if !ok { + glog.Fatalf("Metric config `%q` not found", metricID) } - if nc.mNetDevTxCarrier == nil { - return - } - if nc.mNetDevTxCompressed == nil { - return + err := nc.recorder.Register(metricID, metricConfig.DisplayName, description, unit, + aggregation, []string{interfaceNameLabel}, exporter) + if err != nil { + glog.Fatalf("Failed to initialize metric %q: %v", metricID, err) } +} +func (nc *netCollector) recordNetDev() { fs, err := procfs.NewFS("/proc") stats, err := fs.NetDev() if err != nil { @@ -289,22 +218,7 @@ func (nc *netCollector) recordNetDev() { tags := map[string]string{} tags[interfaceNameLabel] = iface - nc.mNetDevRxBytes.Record(tags, int64(ifaceStats.RxBytes)) - nc.mNetDevRxPackets.Record(tags, int64(ifaceStats.RxPackets)) - nc.mNetDevRxErrors.Record(tags, int64(ifaceStats.RxErrors)) - nc.mNetDevRxDropped.Record(tags, int64(ifaceStats.RxDropped)) - nc.mNetDevRxFifo.Record(tags, int64(ifaceStats.RxFIFO)) - nc.mNetDevRxFrame.Record(tags, int64(ifaceStats.RxFrame)) - nc.mNetDevRxCompressed.Record(tags, int64(ifaceStats.RxCompressed)) - nc.mNetDevRxMulticast.Record(tags, int64(ifaceStats.RxMulticast)) - nc.mNetDevTxBytes.Record(tags, int64(ifaceStats.TxBytes)) - nc.mNetDevTxPackets.Record(tags, int64(ifaceStats.TxPackets)) - nc.mNetDevTxErrors.Record(tags, int64(ifaceStats.TxErrors)) - nc.mNetDevTxDropped.Record(tags, int64(ifaceStats.TxDropped)) - nc.mNetDevTxFifo.Record(tags, int64(ifaceStats.TxFIFO)) - nc.mNetDevTxCollisions.Record(tags, int64(ifaceStats.TxCollisions)) - nc.mNetDevTxCarrier.Record(tags, int64(ifaceStats.TxCarrier)) - nc.mNetDevTxCompressed.Record(tags, int64(ifaceStats.TxCompressed)) + nc.recorder.RecordWithSameTags(ifaceStats, tags) } } @@ -315,3 +229,43 @@ func (nc *netCollector) collect() { nc.recordNetDev() } + +// TODO(@oif): Maybe implements a generic recorder +type ifaceStatRecorder struct { + collectors map[metrics.MetricID]ifaceStatCollector +} + +func newIfaceStatRecorder() *ifaceStatRecorder { + return &ifaceStatRecorder{collectors: make(map[metrics.MetricID]ifaceStatCollector)} +} + +func (r *ifaceStatRecorder) Register(metricID metrics.MetricID, viewName string, description string, + unit string, aggregation metrics.Aggregation, tagNames []string, exporter func(procfs.NetDevLine) int64) error { + if _, ok := r.collectors[metricID]; ok { + // Check duplication + return fmt.Errorf("metric %q already registered", metricID) + } + metric, err := metrics.NewInt64Metric(metricID, viewName, description, unit, aggregation, tagNames) + if err != nil { + return err + } + r.collectors[metricID] = ifaceStatCollector{ + metric: metric, + exporter: exporter, + } + return nil +} + +func (r ifaceStatRecorder) RecordWithSameTags(stat procfs.NetDevLine, tags map[string]string) { + // Range all registered collector and record its measurement with same tags + for metricID, collector := range r.collectors { + measurement := collector.exporter(stat) + collector.metric.Record(tags, measurement) + glog.V(6).Infof("Metric %q record measurement %f with tags %v", metricID, measurement, tags) + } +} + +type ifaceStatCollector struct { + metric *metrics.Int64Metric + exporter func(procfs.NetDevLine) int64 +}