Skip to content

Commit

Permalink
Restore ingest_flows_processed as a simple counter (#322)
Browse files Browse the repository at this point in the history
This removes it as a summary of batch sizes
Batch sizes can however still be figured out, via "Flows processed /
Batch size in bytes count"
  • Loading branch information
jotak committed Oct 7, 2022
1 parent f2b6769 commit b8c0090
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 12 deletions.
4 changes: 2 additions & 2 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
### ingest_flows_processed
| **Name** | ingest_flows_processed |
|:---|:---|
| **Description** | Provides number of flows processed, batches processed, and batch size stats (in number of flows) |
| **Type** | summary |
| **Description** | Number of flows received by the ingester |
| **Type** | counter |
| **Labels** | stage |


Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func instrumentGRPC(m *metrics) grpc2.UnaryServerInterceptor {
m.latency.Observe(delay)
}

// instrument batch size distribution (which also instruments total flows counter under the hood)
m.batchSize.Observe(float64(len(flowRecords.Entries)))
// instrument flows processed counter
m.flowsProcessed.Add(float64(len(flowRecords.Entries)))

// instrument message bytes
m.batchSizeBytes.Observe(float64(proto.Size(flowRecords)))
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func (k *ingestKafka) kafkaListener() {
if k.canLogMessages {
log.Debugf("string(kafkaMessage) = %s\n", string(kafkaMessage.Value))
}
// We don't know how many messages were in kafka internal batches, so just increment per-message
k.metrics.batchSize.Observe(1)
k.metrics.flowsProcessed.Inc()
messageLen := len(kafkaMessage.Value)
k.metrics.batchSizeBytes.Observe(float64(messageLen) + float64(len(kafkaMessage.Key)))
if messageLen > 0 {
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/ingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ var (
operational.TypeHistogram,
"stage",
)
batchSizeSummary = operational.DefineMetric(
"ingest_flows_processed", // This is intentionally named to emphasize its utility for flows counting, despite being a batch size distribution
"Provides number of flows processed, batches processed, and batch size stats (in number of flows)",
operational.TypeSummary,
flowsProcessedCounter = operational.DefineMetric(
"ingest_flows_processed",
"Number of flows received by the ingester",
operational.TypeCounter,
"stage",
)
batchSizeBytesSummary = operational.DefineMetric(
Expand All @@ -39,7 +39,7 @@ type metrics struct {
stageType string
stageDuration prometheus.Observer
latency prometheus.Histogram
batchSize prometheus.Summary
flowsProcessed prometheus.Counter
batchSizeBytes prometheus.Summary
errors *prometheus.CounterVec
}
Expand All @@ -52,7 +52,7 @@ func newMetrics(opMetrics *operational.Metrics, stage, stageType string, inGauge
stageType: stageType,
latency: opMetrics.NewHistogram(&latencyHistogram, []float64{.001, .01, .1, 1, 10, 100, 1000, 10000}, stage),
stageDuration: opMetrics.GetOrCreateStageDurationHisto().WithLabelValues(stage),
batchSize: opMetrics.NewSummary(&batchSizeSummary, stage),
flowsProcessed: opMetrics.NewCounter(&flowsProcessedCounter, stage),
batchSizeBytes: opMetrics.NewSummary(&batchSizeBytesSummary, stage),
errors: opMetrics.NewCounterVec(&errorsCounter),
}
Expand Down

0 comments on commit b8c0090

Please sign in to comment.