diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 1f838630d262..e048546fb408 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -39,18 +39,18 @@ var ( Namespace: constants.Loki, Name: "distributor_bytes_received_total", Help: "The total number of uncompressed bytes received per tenant. Includes structured metadata bytes.", - }, []string{"tenant", "retention_hours"}) + }, []string{"tenant", "retention_hours", "aggregated_metric"}) structuredMetadataBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_structured_metadata_bytes_received_total", Help: "The total number of uncompressed bytes received per tenant for entries' structured metadata", - }, []string{"tenant", "retention_hours"}) + }, []string{"tenant", "retention_hours", "aggregated_metric"}) linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_lines_received_total", Help: "The total number of lines received per tenant", - }, []string{"tenant"}) + }, []string{"tenant", "aggregated_metric"}) bytesReceivedStats = analytics.NewCounter("distributor_bytes_received") structuredMetadataBytesReceivedStats = analytics.NewCounter("distributor_structured_metadata_bytes_received") @@ -70,6 +70,7 @@ type TenantsRetention interface { type Limits interface { OTLPConfig(userID string) OTLPConfig + DiscoverServiceName(userID string) []string } type EmptyLimits struct{} @@ -78,6 +79,10 @@ func (EmptyLimits) OTLPConfig(string) OTLPConfig { return DefaultOTLPConfig(GlobalOTLPConfig{}) } +func (EmptyLimits) DiscoverServiceName(string) []string { + return nil +} + type ( RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) RequestParserWrapper func(inner RequestParser) RequestParser @@ -112,25 +117,22 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete structuredMetadataSize int64 ) + isAggregatedMetric := fmt.Sprintf("%t", pushStats.IsAggregatedMetric) + for retentionPeriod, size := range pushStats.LogLinesBytes { retentionHours := RetentionPeriodToString(retentionPeriod) - - if !pushStats.IsAggregatedMetric { - bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) - bytesReceivedStats.Inc(size) - } + bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) + bytesReceivedStats.Inc(size) entriesSize += size } for retentionPeriod, size := range pushStats.StructuredMetadataBytes { retentionHours := RetentionPeriodToString(retentionPeriod) - if !pushStats.IsAggregatedMetric { - structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) - bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) - bytesReceivedStats.Inc(size) - structuredMetadataBytesReceivedStats.Inc(size) - } + structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) + bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) + bytesReceivedStats.Inc(size) + structuredMetadataBytesReceivedStats.Inc(size) entriesSize += size structuredMetadataSize += size @@ -138,7 +140,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete // incrementing tenant metrics if we have a tenant. if pushStats.NumLines != 0 && userID != "" { - linesIngested.WithLabelValues(userID).Add(float64(pushStats.NumLines)) + linesIngested.WithLabelValues(userID, isAggregatedMetric).Add(float64(pushStats.NumLines)) } linesReceivedStats.Inc(pushStats.NumLines) @@ -162,7 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete return req, nil } -func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body @@ -231,21 +233,37 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe pushStats.ContentType = contentType pushStats.ContentEncoding = contentEncoding - for _, s := range req.Streams { + discoverServiceName := limits.DiscoverServiceName(userID) + for i := range req.Streams { + s := req.Streams[i] pushStats.StreamLabelsSize += int64(len(s.Labels)) - var lbs labels.Labels - if tenantsRetention != nil || tracker != nil { - lbs, err = syntax.ParseLabels(s.Labels) - if err != nil { - return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) - } + lbs, err := syntax.ParseLabels(s.Labels) + if err != nil { + return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) } if lbs.Has(AggregatedMetricLabel) { pushStats.IsAggregatedMetric = true } + if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 && !pushStats.IsAggregatedMetric { + serviceName := ServiceUnknown + for _, labelName := range discoverServiceName { + if labelVal := lbs.Get(labelName); labelVal != "" { + serviceName = labelVal + break + } + } + + lb := labels.NewBuilder(lbs) + lbs = lb.Set(LabelServiceName, serviceName).Labels() + s.Labels = lbs.String() + + // Remove the added label after it's added to the stream so it's not consumed by subsequent steps + lbs = lb.Del(LabelServiceName).Labels() + } + var retentionPeriod time.Duration if tenantsRetention != nil { retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) @@ -268,6 +286,8 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe pushStats.MostRecentEntryTimestamp = e.Timestamp } } + + req.Streams[i] = s } return &req, pushStats, nil diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 0484afe31c3b..80e7c5e7eead 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -60,6 +60,7 @@ func TestParseRequest(t *testing.T) { expectedLines int expectedBytesUsageTracker map[string]float64 expectedLabels labels.Labels + aggregatedMetric bool }{ { path: `/loki/api/v1/push`, @@ -228,6 +229,18 @@ func TestParseRequest(t *testing.T) { expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, expectedLabels: labels.FromStrings("foo", "bar2", LabelServiceName, ServiceUnknown), }, + { + path: `/loki/api/v1/push`, + body: `{"streams": [{ "stream": { "__aggregated_metric__": "stuff", "foo": "bar2", "job": "stuff" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, + contentType: `application/json`, + valid: true, + enableServiceDiscovery: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{__aggregated_metric__="stuff", foo="bar2", job="stuff"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("__aggregated_metric__", "stuff", "foo", "bar2", "job", "stuff"), + aggregatedMetric: true, + }, } { t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) { structuredMetadataBytesIngested.Reset() @@ -259,9 +272,32 @@ func TestParseRequest(t *testing.T) { require.Equal(t, test.expectedBytes, bytesReceived) require.Equalf(t, tracker.Total(), float64(bytesReceived), "tracked usage bytes must equal bytes received metric") require.Equal(t, test.expectedLines, linesReceived) - require.Equal(t, float64(test.expectedStructuredMetadataBytes), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", ""))) - require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) - require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake"))) + require.Equal( + t, + float64(test.expectedStructuredMetadataBytes), + testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric))), + ) + require.Equal( + t, + float64(test.expectedBytes), + testutil.ToFloat64( + bytesIngested.WithLabelValues( + "fake", + "", + fmt.Sprintf("%t", test.aggregatedMetric), + ), + ), + ) + require.Equal( + t, + float64(test.expectedLines), + testutil.ToFloat64( + linesIngested.WithLabelValues( + "fake", + fmt.Sprintf("%t", test.aggregatedMetric), + ), + ), + ) require.Equal(t, test.expectedLabels.String(), data.Streams[0].Labels) require.InDeltaMapValuesf(t, test.expectedBytesUsageTracker, tracker.receivedBytes, 0.0, "%s != %s", test.expectedBytesUsageTracker, tracker.receivedBytes) } else { @@ -270,9 +306,9 @@ func TestParseRequest(t *testing.T) { require.Equal(t, 0, structuredMetadataBytesReceived) require.Equal(t, 0, bytesReceived) require.Equal(t, 0, linesReceived) - require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", ""))) - require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) - require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake"))) + require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric)))) + require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric)))) + require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake", fmt.Sprintf("%t", test.aggregatedMetric)))) } }) }