From fbb02f1af544c46b0ac77c7bccb58d89750cff71 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 27 Mar 2024 14:28:32 +0100 Subject: [PATCH] fix: Track all OTLP metadata bytes in usage tracker. (#12376) Co-authored-by: Vladyslav Diachenko <82767850+vlad-diachenko@users.noreply.github.com> --- pkg/loghttp/push/otlp.go | 8 +++++++ pkg/loghttp/push/otlp_test.go | 39 ++++++++++------------------------- pkg/loghttp/push/push_test.go | 9 ++++++++ 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index 8136d6995dc62..a001b52b210f6 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -151,6 +151,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten retentionPeriodForUser := tenantsRetention.RetentionPeriodFor(userID, lbs) stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize) + if tracker != nil { + tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(resourceAttributesAsStructuredMetadataSize)) + } + stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], resourceAttributesAsStructuredMetadata...) for j := 0; j < sls.Len(); j++ { @@ -202,6 +206,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten scopeAttributesAsStructuredMetadataSize := labelsSize(scopeAttributesAsStructuredMetadata) stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize) + if tracker != nil { + tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(scopeAttributesAsStructuredMetadataSize)) + } + stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], scopeAttributesAsStructuredMetadata...) for k := 0; k < logs.Len(); k++ { log := logs.At(k) diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index 5202505fd1bf1..c711c85905cf2 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -26,7 +26,6 @@ func TestOTLPToLokiPushRequest(t *testing.T) { expectedPushRequest logproto.PushRequest expectedStats Stats otlpConfig OTLPConfig - tracker UsageTracker }{ { name: "no logs", @@ -129,7 +128,6 @@ func TestOTLPToLokiPushRequest(t *testing.T) { { name: "service.name not defined in resource attributes", otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig), - tracker: NewMockTracker(), generateLogs: func() plog.Logs { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("service.namespace", "foo") @@ -164,32 +162,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, StreamLabelsSize: 47, MostRecentEntryTimestamp: now, - /* - logLinesBytesCustomTrackers: []customTrackerPair{ - { - Labels: []labels.Label{ - {Name: "service_namespace", Value: "foo"}, - {Name: "tracker", Value: "foo"}, - }, - Bytes: map[time.Duration]int64{ - time.Hour: 9, - }, - }, - }, - structuredMetadataBytesCustomTrackers: []customTrackerPair{ - { - Labels: []labels.Label{ - {Name: "service_namespace", Value: "foo"}, - {Name: "tracker", Value: "foo"}, - }, - Bytes: map[time.Duration]int64{ - time.Hour: 0, - }, - }, - }, - */ }, - //expectedTrackedUsaged: }, { name: "resource attributes and scope attributes stored as structured metadata", @@ -518,9 +491,19 @@ func TestOTLPToLokiPushRequest(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { stats := newPushStats() - pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tc.tracker, stats) + tracker := NewMockTracker() + pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tracker, stats) require.Equal(t, tc.expectedPushRequest, *pushReq) require.Equal(t, tc.expectedStats, *stats) + + totalBytes := 0.0 + for _, b := range stats.LogLinesBytes { + totalBytes += float64(b) + } + for _, b := range stats.StructuredMetadataBytes { + totalBytes += float64(b) + } + require.Equal(t, totalBytes, tracker.Total(), "Total tracked bytes must equal total bytes of the stats.") }) } } diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 9f470fc0eb9e4..8c4768615ce6c 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -226,6 +226,7 @@ func TestParseRequest(t *testing.T) { assert.NotNil(t, data, "Should give data for %d", index) require.Equal(t, test.expectedStructuredMetadataBytes, structuredMetadataBytesReceived) require.Equal(t, test.expectedBytes, bytesReceived) + require.Equalf(t, tracker.Total(), float64(bytesReceived), "tracked usage bytes must equal bytes received metric") require.Equal(t, test.expectedLines, linesReceived) require.Equal(t, float64(test.expectedStructuredMetadataBytes), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", ""))) require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) @@ -257,6 +258,14 @@ func NewMockTracker() *MockCustomTracker { } } +func (t *MockCustomTracker) Total() float64 { + total := float64(0) + for _, v := range t.receivedBytes { + total += v + } + return total +} + // DiscardedBytesAdd implements CustomTracker. func (t *MockCustomTracker) DiscardedBytesAdd(_ context.Context, _, _ string, labels labels.Labels, value float64) { t.discardedBytes[labels.String()] += value