Skip to content

Commit

Permalink
conntrack: Add operational metrics (#255)
Browse files Browse the repository at this point in the history
* Add conntrack_memory_connections metric

* Add conntrack_input_records operational metric

* Add conntrack_output_records operational metric

* Refactor
  • Loading branch information
ronensc committed Jul 26, 2022
1 parent d104d33 commit 94fcbbf
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Following is the supported API format for kafka encode:
readTimeout: timeout (in seconds) for read operation performed by the Writer
batchBytes: limit the maximum size of a request in bytes before being sent to a partition
batchSize: limit on how many messages will be buffered before being sent to a partition
tls: TLS client configuration (optional)
</pre>
## Ingest collector API
Following is the supported API format for the NetFlow / IPFIX collector:
Expand Down Expand Up @@ -65,6 +66,7 @@ Following is the supported API format for the kafka ingest:
protobuf: Protobuf decoder
batchMaxLen: the number of accumulated flows before being forwarded for processing
commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously.
tls: TLS client configuration (optional)
</pre>
## Ingest GRPC from Network Observability eBPF Agent
Following is the supported API format for the Network Observability eBPF ingest:
Expand Down
21 changes: 21 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Type** | counter |


### conntrack_memory_connections
| **Name** | conntrack_memory_connections |
|:---|:---|
| **Description** | The total number of tracked connections in memory. |
| **Type** | gauge |


### conntrack_input_records
| **Name** | conntrack_input_records |
|:---|:---|
| **Description** | The total number of input records per classification. |
| **Type** | counter |


### conntrack_output_records
| **Name** | conntrack_output_records |
|:---|:---|
| **Description** | The total number of output records. |
| **Type** | counter |


### ingest_collector_queue_length
| **Name** | ingest_collector_queue_length |
|:---|:---|
Expand Down
9 changes: 9 additions & 0 deletions pkg/operational/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ func NewCounter(opts prometheus.CounterOpts) prometheus.Counter {
return promauto.NewCounter(opts)
}

func NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *prometheus.CounterVec {
metricsOpts = append(metricsOpts, metricDefinition{
Name: opts.Name,
Help: opts.Help,
Type: "counter",
})
return promauto.NewCounterVec(opts, labelNames)
}

func NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge {
metricsOpts = append(metricsOpts, metricDefinition{
Name: opts.Name,
Expand Down
8 changes: 8 additions & 0 deletions pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (cs *connectionStore) addConnection(hashId uint64, conn connection) {
}
e := cs.connList.PushBack(conn)
cs.hash2conn[hashId] = e
metrics.connStoreLength.Set(float64(cs.connList.Len()))
}

func (cs *connectionStore) getConnection(hashId uint64) (connection, bool) {
Expand Down Expand Up @@ -94,6 +95,7 @@ func (cs *connectionStore) iterateOldToNew(f processConnF) {
if shouldDelete {
delete(cs.hash2conn, conn.getHash().hashTotal)
cs.connList.Remove(e)
metrics.connStoreLength.Set(float64(cs.connList.Len()))
}
if shouldStop {
break
Expand Down Expand Up @@ -130,6 +132,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
computedHash, err := ComputeHash(fl, ct.config.KeyDefinition, ct.hashProvider())
if err != nil {
log.Warningf("skipping flow log %v: %v", fl, err)
metrics.inputRecords.WithLabelValues("rejected").Inc()
continue
}
conn, exists := ct.connStore.getConnection(computedHash.hashTotal)
Expand All @@ -142,27 +145,32 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
Build()
ct.connStore.addConnection(computedHash.hashTotal, conn)
ct.updateConnection(conn, fl, computedHash)
metrics.inputRecords.WithLabelValues("newConnection").Inc()
if ct.shouldOutputNewConnection {
record := conn.toGenericMap()
addHashField(record, computedHash.hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection"))
outputRecords = append(outputRecords, record)
metrics.outputRecords.WithLabelValues("newConnection").Inc()
}
} else {
ct.updateConnection(conn, fl, computedHash)
metrics.inputRecords.WithLabelValues("update").Inc()
}

if ct.shouldOutputFlowLogs {
record := fl.Copy()
addHashField(record, computedHash.hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("FlowLog"))
outputRecords = append(outputRecords, record)
metrics.outputRecords.WithLabelValues("flowLog").Inc()
}
}

endConnectionRecords := ct.popEndConnections()
if ct.shouldOutputEndConnection {
outputRecords = append(outputRecords, endConnectionRecords...)
metrics.outputRecords.WithLabelValues("endConnection").Add(float64(len(endConnectionRecords)))
}

return outputRecords
Expand Down
40 changes: 40 additions & 0 deletions pkg/pipeline/extract/conntrack/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package conntrack

import (
operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
"github.com/prometheus/client_golang/prometheus"
)

const (
classificationLabel = "classification"
typeLabel = "type"
)

var metrics = newMetrics()

type metricsType struct {
connStoreLength prometheus.Gauge
inputRecords *prometheus.CounterVec
outputRecords *prometheus.CounterVec
}

func newMetrics() *metricsType {
var m metricsType

m.connStoreLength = operationalMetrics.NewGauge(prometheus.GaugeOpts{
Name: "conntrack_memory_connections",
Help: "The total number of tracked connections in memory.",
})

m.inputRecords = operationalMetrics.NewCounterVec(prometheus.CounterOpts{
Name: "conntrack_input_records",
Help: "The total number of input records per classification.",
}, []string{classificationLabel})

m.outputRecords = operationalMetrics.NewCounterVec(prometheus.CounterOpts{
Name: "conntrack_output_records",
Help: "The total number of output records.",
}, []string{typeLabel})

return &m
}

0 comments on commit 94fcbbf

Please sign in to comment.