diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 10524db69..5a7e0b782 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -58,8 +58,8 @@ Each table below provides documentation for an exported flowlogs-pipeline operat ### ingest_flows_processed | **Name** | ingest_flows_processed | |:---|:---| -| **Description** | Provides number of flows processed, batches processed, and batch size stats (in number of flows) | -| **Type** | summary | +| **Description** | Number of flows received by the ingester | +| **Type** | counter | | **Labels** | stage | diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index aac8f4b67..283b2c19c 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -95,8 +95,8 @@ func instrumentGRPC(m *metrics) grpc2.UnaryServerInterceptor { m.latency.Observe(delay) } - // instrument batch size distribution (which also instruments total flows counter under the hood) - m.batchSize.Observe(float64(len(flowRecords.Entries))) + // instrument flows processed counter + m.flowsProcessed.Add(float64(len(flowRecords.Entries))) // instrument message bytes m.batchSizeBytes.Observe(float64(proto.Size(flowRecords))) diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index e67fe317d..6571f94ea 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -79,8 +79,7 @@ func (k *ingestKafka) kafkaListener() { if k.canLogMessages { log.Debugf("string(kafkaMessage) = %s\n", string(kafkaMessage.Value)) } - // We don't know how many messages were in kafka internal batches, so just increment per-message - k.metrics.batchSize.Observe(1) + k.metrics.flowsProcessed.Inc() messageLen := len(kafkaMessage.Value) k.metrics.batchSizeBytes.Observe(float64(messageLen) + float64(len(kafkaMessage.Key))) if messageLen > 0 { diff --git a/pkg/pipeline/ingest/metrics.go b/pkg/pipeline/ingest/metrics.go index c49f1a424..d59f761f2 100644 --- a/pkg/pipeline/ingest/metrics.go +++ b/pkg/pipeline/ingest/metrics.go @@ -13,10 +13,10 @@ var ( operational.TypeHistogram, "stage", ) - batchSizeSummary = operational.DefineMetric( - "ingest_flows_processed", // This is intentionally named to emphasize its utility for flows counting, despite being a batch size distribution - "Provides number of flows processed, batches processed, and batch size stats (in number of flows)", - operational.TypeSummary, + flowsProcessedCounter = operational.DefineMetric( + "ingest_flows_processed", + "Number of flows received by the ingester", + operational.TypeCounter, "stage", ) batchSizeBytesSummary = operational.DefineMetric( @@ -39,7 +39,7 @@ type metrics struct { stageType string stageDuration prometheus.Observer latency prometheus.Histogram - batchSize prometheus.Summary + flowsProcessed prometheus.Counter batchSizeBytes prometheus.Summary errors *prometheus.CounterVec } @@ -52,7 +52,7 @@ func newMetrics(opMetrics *operational.Metrics, stage, stageType string, inGauge stageType: stageType, latency: opMetrics.NewHistogram(&latencyHistogram, []float64{.001, .01, .1, 1, 10, 100, 1000, 10000}, stage), stageDuration: opMetrics.GetOrCreateStageDurationHisto().WithLabelValues(stage), - batchSize: opMetrics.NewSummary(&batchSizeSummary, stage), + flowsProcessed: opMetrics.NewCounter(&flowsProcessedCounter, stage), batchSizeBytes: opMetrics.NewSummary(&batchSizeBytesSummary, stage), errors: opMetrics.NewCounterVec(&errorsCounter), }