From 6a0f65b60e7f6f3671d003755ce224038d1f07e6 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 8 Feb 2023 15:18:02 +0200 Subject: [PATCH] Fix rebase --- docs/operational-metrics.md | 121 ++++++++++++++++++ pkg/pipeline/extract/conntrack/conntrack.go | 2 +- .../extract/conntrack/conntrack_test.go | 8 +- pkg/pipeline/extract/conntrack/store.go | 4 + 4 files changed, 130 insertions(+), 5 deletions(-) diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index e69de29bb..57ec2cfa7 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -0,0 +1,121 @@ + +> Note: this file was automatically generated, to update execute "make docs" + +# flowlogs-pipeline Operational Metrics + +Each table below provides documentation for an exported flowlogs-pipeline operational metric. + + + +### conntrack_input_records +| **Name** | conntrack_input_records | +|:---|:---| +| **Description** | The total number of input records per classification. | +| **Type** | counter | +| **Labels** | classification | + + +### conntrack_memory_connections +| **Name** | conntrack_memory_connections | +|:---|:---| +| **Description** | The total number of tracked connections in memory. | +| **Type** | gauge | +| **Labels** | group | + + +### conntrack_output_records +| **Name** | conntrack_output_records | +|:---|:---| +| **Description** | The total number of output records. | +| **Type** | counter | +| **Labels** | type | + + +### encode_prom_errors +| **Name** | encode_prom_errors | +|:---|:---| +| **Description** | Total errors during metrics generation | +| **Type** | counter | +| **Labels** | error, metric, key | + + +### ingest_batch_size_bytes +| **Name** | ingest_batch_size_bytes | +|:---|:---| +| **Description** | Ingested batch size distribution, in bytes | +| **Type** | summary | +| **Labels** | stage | + + +### ingest_errors +| **Name** | ingest_errors | +|:---|:---| +| **Description** | Counter of errors during ingestion | +| **Type** | counter | +| **Labels** | stage, type, code | + + +### ingest_flows_processed +| **Name** | ingest_flows_processed | +|:---|:---| +| **Description** | Number of flows received by the ingester | +| **Type** | counter | +| **Labels** | stage | + + +### ingest_latency_ms +| **Name** | ingest_latency_ms | +|:---|:---| +| **Description** | Latency between flow end time and ingest time, in milliseconds | +| **Type** | histogram | +| **Labels** | stage | + + +### metrics_dropped +| **Name** | metrics_dropped | +|:---|:---| +| **Description** | Number of metrics dropped | +| **Type** | counter | +| **Labels** | stage | + + +### metrics_processed +| **Name** | metrics_processed | +|:---|:---| +| **Description** | Number of metrics processed | +| **Type** | counter | +| **Labels** | stage | + + +### records_written +| **Name** | records_written | +|:---|:---| +| **Description** | Number of output records written | +| **Type** | counter | +| **Labels** | stage | + + +### stage_duration_ms +| **Name** | stage_duration_ms | +|:---|:---| +| **Description** | Pipeline stage duration in milliseconds | +| **Type** | histogram | +| **Labels** | stage | + + +### stage_in_queue_size +| **Name** | stage_in_queue_size | +|:---|:---| +| **Description** | Pipeline stage input queue size (number of elements in queue) | +| **Type** | gauge | +| **Labels** | stage | + + +### stage_out_queue_size +| **Name** | stage_out_queue_size | +|:---|:---| +| **Description** | Pipeline stage output queue size (number of elements in queue) | +| **Type** | gauge | +| **Labels** | stage | + + diff --git a/pkg/pipeline/extract/conntrack/conntrack.go b/pkg/pipeline/extract/conntrack/conntrack.go index 4e5ae9990..83e4ed203 100644 --- a/pkg/pipeline/extract/conntrack/conntrack.go +++ b/pkg/pipeline/extract/conntrack/conntrack.go @@ -67,7 +67,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM } conn, exists := ct.connStore.getConnection(computedHash.hashTotal) if !exists { - if (ct.config.MaxConnectionsTracked > 0) && (ct.config.MaxConnectionsTracked <= ct.connStore.mom.Len()) { + if (ct.config.MaxConnectionsTracked > 0) && (ct.connStore.len() >= ct.config.MaxConnectionsTracked) { log.Warningf("too many connections; skipping flow log %v: ", fl) ct.metrics.inputRecords.WithLabelValues("discarded").Inc() } else { diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index 30b702a06..0d64fee09 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -896,17 +896,17 @@ func TestMaxConnections(t *testing.T) { require.NoError(t, err) ct := extract.(*conntrackImpl) - require.Equal(t, 0, ct.connStore.mom.Len()) + require.Equal(t, 0, ct.connStore.len()) flowLogs := test.GenerateConnectionEntries(10) ct.Extract(flowLogs) - require.Equal(t, 10, ct.connStore.mom.Len()) + require.Equal(t, 10, ct.connStore.len()) flowLogs = test.GenerateConnectionEntries(20) ct.Extract(flowLogs) - require.Equal(t, 20, ct.connStore.mom.Len()) + require.Equal(t, 20, ct.connStore.len()) flowLogs = test.GenerateConnectionEntries(40) ct.Extract(flowLogs) - require.Equal(t, maxConnections, ct.connStore.mom.Len()) + require.Equal(t, maxConnections, ct.connStore.len()) } diff --git a/pkg/pipeline/extract/conntrack/store.go b/pkg/pipeline/extract/conntrack/store.go index c29f14953..a857d2ce4 100644 --- a/pkg/pipeline/extract/conntrack/store.go +++ b/pkg/pipeline/extract/conntrack/store.go @@ -175,6 +175,10 @@ func (cs *connectionStore) prepareUpdateConnections() []connection { return connections } +func (cs *connectionStore) len() int { + return len(cs.hashId2groupIdx) +} + // schedulingGroupToLabelValue returns a string representation of a scheduling group to be used as a Prometheus label // value. func schedulingGroupToLabelValue(groupIdx int, group api.ConnTrackSchedulingGroup) string {