From d4ba94366a8bd5b4b892835513fe57433485e3e2 Mon Sep 17 00:00:00 2001 From: Sven Grossmann Date: Thu, 5 Sep 2024 16:09:56 +0200 Subject: [PATCH 1/8] remove parsed and metadata labels before parsing again --- pkg/querier/querier.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index cb8086564752..98e4fe69cf03 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1259,10 +1259,15 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers } parsers := queryParsers - parsedLabels := getParsedLabels(entry) - if len(parsedLabels) == 0 { - parsedLabels, parsers = parseLine(entry.Line, streamLbls) + pl := getParsedLabels(entry) + streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0) + for k := range pl { + streamLbls = streamLbls.Del(k) } + for k := range structuredMetadata { + streamLbls = streamLbls.Del(k) + } + parsedLabels, parsers := parseLine(entry.Line, streamLbls) for k, vals := range parsedLabels { df, ok := detectedFields[k] if !ok && fieldCount < limit { @@ -1343,11 +1348,10 @@ func getStructuredMetadata(entry push.Entry) map[string][]string { return result } -func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []string) { +func parseLine(line string, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) { parser := "logfmt" logFmtParser := logql_log.NewLogfmtParser(true, false) - lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0) _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) if !logfmtSuccess || lbls.HasErr() { parser = "json" @@ -1362,9 +1366,9 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st parsedLabels := map[string]map[string]struct{}{} for _, lbl := range lbls.LabelsResult().Labels() { // skip indexed labels, as we only want detected fields - if streamLbls.Has(lbl.Name) { - continue - } + // if streamLbls.Has(lbl.Name) { + // continue + // } if values, ok := parsedLabels[lbl.Name]; ok { values[lbl.Value] = struct{}{} } else { From 649b664366d4693ca2d8e2b674cb503489ca8a2b Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 5 Sep 2024 12:26:48 -0600 Subject: [PATCH 2/8] fix: detected fields _extracted logic --- pkg/querier/querier.go | 47 +-- pkg/querier/querier_test.go | 571 ++++++++++++++++++++++++++++++++++++ 2 files changed, 576 insertions(+), 42 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 98e4fe69cf03..b4d7225c9926 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1102,9 +1102,8 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. if err != nil { return nil, err } - parsers := getParsersFromExpr(expr) - detectedFields := parseDetectedFields(req.FieldLimit, streams, parsers) + detectedFields := parseDetectedFields(req.FieldLimit, streams) fields := make([]*logproto.DetectedField, len(detectedFields)) fieldCount := 0 @@ -1220,7 +1219,7 @@ func determineType(value string) logproto.DetectedFieldType { return logproto.DetectedFieldString } -func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers []string) map[string]*parsedFields { +func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields { detectedFields := make(map[string]*parsedFields, limit) fieldCount := uint32(0) emtpyparsers := []string{} @@ -1258,15 +1257,7 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers } } - parsers := queryParsers - pl := getParsedLabels(entry) - streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0) - for k := range pl { - streamLbls = streamLbls.Del(k) - } - for k := range structuredMetadata { - streamLbls = streamLbls.Del(k) - } + streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash()) parsedLabels, parsers := parseLine(entry.Line, streamLbls) for k, vals := range parsedLabels { df, ok := detectedFields[k] @@ -1304,28 +1295,6 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers return detectedFields } -func getParsedLabels(entry push.Entry) map[string][]string { - labels := map[string]map[string]struct{}{} - for _, lbl := range entry.Parsed { - if values, ok := labels[lbl.Name]; ok { - values[lbl.Value] = struct{}{} - } else { - labels[lbl.Name] = map[string]struct{}{lbl.Value: {}} - } - } - - result := make(map[string][]string, len(labels)) - for lbl, values := range labels { - vals := make([]string, 0, len(values)) - for v := range values { - vals = append(vals, v) - } - result[lbl] = vals - } - - return result -} - func getStructuredMetadata(entry push.Entry) map[string][]string { labels := map[string]map[string]struct{}{} for _, lbl := range entry.StructuredMetadata { @@ -1364,11 +1333,8 @@ func parseLine(line string, lbls *logql_log.LabelsBuilder) (map[string][]string, } parsedLabels := map[string]map[string]struct{}{} - for _, lbl := range lbls.LabelsResult().Labels() { - // skip indexed labels, as we only want detected fields - // if streamLbls.Has(lbl.Name) { - // continue - // } + lblsResult := lbls.LabelsResult().Parsed() + for _, lbl := range lblsResult { if values, ok := parsedLabels[lbl.Name]; ok { values[lbl.Value] = struct{}{} } else { @@ -1389,9 +1355,6 @@ func parseLine(line string, lbls *logql_log.LabelsBuilder) (map[string][]string, } // streamsForFieldDetection reads the streams from the iterator and returns them sorted. -// If categorizeLabels is true, the stream labels contains just the stream labels and entries inside each stream have their -// structuredMetadata and parsed fields populated with structured metadata labels plus the parsed labels respectively. -// Otherwise, the stream labels are the whole series labels including the stream labels, structured metadata labels and parsed labels. func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Streams, error) { streams := map[string]*logproto.Stream{} respSize := uint32(0) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index e542b28247cd..e418507d37d7 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -16,10 +16,12 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/user" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/push" util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/compactor/deletion" @@ -27,6 +29,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage" "github.com/grafana/loki/v3/pkg/util/constants" @@ -2096,3 +2099,571 @@ func Test_getParsersFromExpr(t *testing.T) { assert.Equal(t, []string{"logfmt", "json"}, getParsersFromExpr(expr)) }) } + +func Test_parseDetectedFeilds(t *testing.T) { + now := time.Now() + + t.Run("when no parsers are supplied", func(t *testing.T) { + infoDetectdFiledMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "info", + }, + } + + rulerLines := []push.Entry{ + {Timestamp: now, Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, + {Timestamp: now, Line: "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, + {Timestamp: now, Line: "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, + } + + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + debugDetectedFieldMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + } + + nginxJsonLines := []push.Entry{ + {Timestamp: now, Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, StructuredMetadata: debugDetectedFieldMetadata}, + {Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, StructuredMetadata: debugDetectedFieldMetadata}, + {Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, StructuredMetadata: debugDetectedFieldMetadata}, + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJsonLines, + Hash: nginxMetric.Hash(), + } + + t.Run("detect logfmt fields when with no supplied parsers", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect json fields when with no supplied parsers", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream})) + for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "json", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect mixed fields when with no supplied parsers", func(t *testing.T) { + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) + + for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected) + require.Equal(t, "logfmt", parsers[0], "expected only logfmt parser for %s", expected) + } + + for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for a single stream", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJsonLines, + Hash: nginxMetric.Hash(), + } + + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + }) + + t.Run("when parsers are supplied", func(t *testing.T) { + infoDetectdFiledMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "info", + }, + } + + parsedRulerFields := func(ts, tenant, duration string) []push.LabelAdapter { + return []push.LabelAdapter{ + { + Name: "ts", + Value: ts, + }, + { + Name: "caller", + Value: "grpc_logging.go:66", + }, + { + Name: "tenant", + Value: tenant, + }, + { + Name: "level", + Value: "info", + }, + { + Name: "method", + Value: "/cortex.Ingester/Push", + }, + { + Name: "duration", + Value: duration, + }, + { + Name: "msg", + Value: "gRPC", + }, + } + } + + rulerLines := []push.Entry{ + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: parsedRulerFields("2024-09-05T15:36:38.757788067Z", "2419", "19.098s"), + }, + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: parsedRulerFields("2024-09-05T15:36:38.698375619Z", "29", "5.471s"), + }, + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: parsedRulerFields("2024-09-05T15:36:38.629424175Z", "2919", "29.234s"), + }, + } + + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + debugDetectedFieldMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + } + + parsedNginxFields := func(host, userIdentifier, datetime, method, request, protocol, status, bytes, referer string) []push.LabelAdapter { + return []push.LabelAdapter{ + { + Name: "host", + Value: host, + }, + { + Name: "user_identifier", + Value: userIdentifier, + }, + { + Name: "datetime", + Value: datetime, + }, + { + Name: "method", + Value: method, + }, + { + Name: "request", + Value: request, + }, + { + Name: "protocol", + Value: protocol, + }, + { + Name: "status", + Value: status, + }, + { + Name: "bytes", + Value: bytes, + }, + { + Name: "referer", + Value: referer, + }, + } + } + + nginxJsonLines := []push.Entry{ + { + Timestamp: now, + Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, + StructuredMetadata: debugDetectedFieldMetadata, + Parsed: parsedNginxFields("100.117.38.203", "nadre3722", "05/Sep/2024:16:13:56 +0000", "PATCH", "/api/loki/v1/push", "HTTP/2.0", "200", "9664", "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"), + }, + { + Timestamp: now, + Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, + StructuredMetadata: debugDetectedFieldMetadata, + Parsed: parsedNginxFields("66.134.9.30", "-", "05/Sep/2024:16:13:55 +0000", "DELETE", "/api/mimir/v1/push", "HTTP/1.1", "200", "18688", "https://www.districtiterate.biz/synergistic/next-generation/extend"), + }, + { + Timestamp: now, + Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, + StructuredMetadata: debugDetectedFieldMetadata, + Parsed: parsedNginxFields("66.134.9.30", "-", "05/Sep/2024:16:13:55 +0000", "GET", "/api/loki/v1/label/names", "HTTP/1.1", "200", "9314", "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"), + }, + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJsonLines, + Hash: nginxMetric.Hash(), + } + + t.Run("detect logfmt fields", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect json fields", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream})) + for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "json", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect mixed fields", func(t *testing.T) { + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) + + for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected) + require.Equal(t, "logfmt", parsers[0], "expected only logfmt parser for %s", expected) + } + + for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for a single stream", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJsonLines, + Hash: nginxMetric.Hash(), + } + + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + }) + + t.Run("handles level in all the places", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house", level="debug"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: []push.Entry{ + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", + StructuredMetadata: []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + }, + Parsed: []push.LabelAdapter{ + { + Name: "level", + Value: "info", + }, + }, + }, + }, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, rulerStream})) + + detectedLevelField := df["detected_level"] + require.Len(t, detectedLevelField.parsers, 0) + require.Equal(t, uint64(1), detectedLevelField.sketch.Estimate()) + + levelField := df["level_extracted"] + require.Len(t, levelField.parsers, 1) + require.Contains(t, levelField.parsers, "logfmt") + require.Equal(t, uint64(1), levelField.sketch.Estimate()) + }) +} From 5f232c14459cf1bf2d5c498e4b92303eaad6bd97 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 5 Sep 2024 13:49:16 -0600 Subject: [PATCH 3/8] fix: filter out parsed and structured metadata from stream lbls --- pkg/querier/querier.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b4d7225c9926..1d87b60cbfcf 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1370,12 +1370,28 @@ func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Str // If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line. // Then check to see if the entry is equal to, or past a forward step if lastEntry.Unix() < 0 || shouldOutput { - stream, ok := streams[streamLabels] + allLbls, err := syntax.ParseLabels(streamLabels) + if err != nil { + continue + } + + parsedLbls := logproto.FromLabelAdaptersToLabels(entry.Parsed) + structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata) + + onlyStreamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(allLbls, 0) + allLbls.Range(func(l labels.Label) { + if parsedLbls.Has(l.Name) || structuredMetadata.Has(l.Name) { + onlyStreamLbls.Del(l.Name) + } + }) + + lblStr := onlyStreamLbls.LabelsResult().String() + stream, ok := streams[lblStr] if !ok { stream = &logproto.Stream{ - Labels: streamLabels, + Labels: lblStr, } - streams[streamLabels] = stream + streams[lblStr] = stream } stream.Entries = append(stream.Entries, entry) lastEntry = i.At().Timestamp From 9ec13d3c6418b03202540820b3df27ce4c491772 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 5 Sep 2024 15:56:44 -0600 Subject: [PATCH 4/8] fix: capture parsed, removed logfmt strict --- pkg/querier/querier.go | 60 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 1d87b60cbfcf..a04c0e9a5e33 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1258,7 +1258,7 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p } streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash()) - parsedLabels, parsers := parseLine(entry.Line, streamLbls) + parsedLabels, parsers := parseEntry(entry, streamLbls) for k, vals := range parsedLabels { df, ok := detectedFields[k] if !ok && fieldCount < limit { @@ -1317,10 +1317,22 @@ func getStructuredMetadata(entry push.Entry) map[string][]string { return result } -func parseLine(line string, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) { - parser := "logfmt" - logFmtParser := logql_log.NewLogfmtParser(true, false) +func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) { + logFmtParser := logql_log.NewLogfmtParser(false, false) + + origParsed := getParsedLabels(entry) + parsed := make(map[string][]string, len(origParsed)) + + for lbl, values := range origParsed { + if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || lbl == logqlmodel.PreserveErrorLabel { + continue + } + parsed[lbl] = values + } + + line := entry.Line + parser := "logfmt" _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) if !logfmtSuccess || lbls.HasErr() { parser = "json" @@ -1328,11 +1340,24 @@ func parseLine(line string, lbls *logql_log.LabelsBuilder) (map[string][]string, lbls.Reset() _, jsonSuccess := jsonParser.Process(0, []byte(line), lbls) if !jsonSuccess || lbls.HasErr() { - return map[string][]string{}, nil + return parsed, nil } } parsedLabels := map[string]map[string]struct{}{} + for lbl, values := range parsed { + if vals, ok := parsedLabels[lbl]; ok { + for _, value := range values { + vals[value] = struct{}{} + } + } else { + parsedLabels[lbl] = map[string]struct{}{} + for _, value := range values { + parsedLabels[lbl][value] = struct{}{} + } + } + } + lblsResult := lbls.LabelsResult().Parsed() for _, lbl := range lblsResult { if values, ok := parsedLabels[lbl.Name]; ok { @@ -1344,6 +1369,9 @@ func parseLine(line string, lbls *logql_log.LabelsBuilder) (map[string][]string, result := make(map[string][]string, len(parsedLabels)) for lbl, values := range parsedLabels { + if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || lbl == logqlmodel.PreserveErrorLabel { + continue + } vals := make([]string, 0, len(values)) for v := range values { vals = append(vals, v) @@ -1354,6 +1382,28 @@ func parseLine(line string, lbls *logql_log.LabelsBuilder) (map[string][]string, return result, []string{parser} } +func getParsedLabels(entry push.Entry) map[string][]string { + labels := map[string]map[string]struct{}{} + for _, lbl := range entry.Parsed { + if values, ok := labels[lbl.Name]; ok { + values[lbl.Value] = struct{}{} + } else { + labels[lbl.Name] = map[string]struct{}{lbl.Value: {}} + } + } + + result := make(map[string][]string, len(labels)) + for lbl, values := range labels { + vals := make([]string, 0, len(values)) + for v := range values { + vals = append(vals, v) + } + result[lbl] = vals + } + + return result +} + // streamsForFieldDetection reads the streams from the iterator and returns them sorted. func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Streams, error) { streams := map[string]*logproto.Stream{} From 02c7c7fbdbaff6144ac730140c98e9bc775632f2 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 5 Sep 2024 17:11:21 -0600 Subject: [PATCH 5/8] fix: switch the order we try parsers need this now that logfmt isn't strict --- pkg/querier/querier.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index a04c0e9a5e33..e357c7741102 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1332,14 +1332,14 @@ func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]s } line := entry.Line - parser := "logfmt" - _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) - if !logfmtSuccess || lbls.HasErr() { - parser = "json" - jsonParser := logql_log.NewJSONParser() + parser := "json" + jsonParser := logql_log.NewJSONParser() + _, jsonSuccess := jsonParser.Process(0, []byte(line), lbls) + if !jsonSuccess || lbls.HasErr() { + parser = "logfmt" lbls.Reset() - _, jsonSuccess := jsonParser.Process(0, []byte(line), lbls) - if !jsonSuccess || lbls.HasErr() { + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) + if !logfmtSuccess || lbls.HasErr() { return parsed, nil } } From b0c5101b3ecdc473c1560e1da9c6a1d813d09fdb Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 6 Sep 2024 09:50:21 -0600 Subject: [PATCH 6/8] Update pkg/querier/querier_test.go Co-authored-by: Sven Grossmann --- pkg/querier/querier_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index e418507d37d7..2f2e1d9b81bc 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -2134,7 +2134,7 @@ func Test_parseDetectedFeilds(t *testing.T) { }, } - nginxJsonLines := []push.Entry{ + nginxJSONLines := []push.Entry{ {Timestamp: now, Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, StructuredMetadata: debugDetectedFieldMetadata}, {Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, StructuredMetadata: debugDetectedFieldMetadata}, {Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, StructuredMetadata: debugDetectedFieldMetadata}, From d8f53fc27b18ecb3a53c49ead914b3e38d466f52 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 6 Sep 2024 12:44:06 -0600 Subject: [PATCH 7/8] test: improve realism of mock stream iterator --- pkg/querier/querier.go | 12 +++--- pkg/querier/querier_mock_test.go | 64 +++++++++++++++++++++++--------- 2 files changed, 54 insertions(+), 22 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index e357c7741102..2046763e6973 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1318,13 +1318,12 @@ func getStructuredMetadata(entry push.Entry) map[string][]string { } func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) { - logFmtParser := logql_log.NewLogfmtParser(false, false) - origParsed := getParsedLabels(entry) parsed := make(map[string][]string, len(origParsed)) for lbl, values := range origParsed { - if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || lbl == logqlmodel.PreserveErrorLabel { + if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || + lbl == logqlmodel.PreserveErrorLabel { continue } @@ -1336,8 +1335,10 @@ func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]s jsonParser := logql_log.NewJSONParser() _, jsonSuccess := jsonParser.Process(0, []byte(line), lbls) if !jsonSuccess || lbls.HasErr() { - parser = "logfmt" lbls.Reset() + + logFmtParser := logql_log.NewLogfmtParser(false, false) + parser = "logfmt" _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) if !logfmtSuccess || lbls.HasErr() { return parsed, nil @@ -1369,7 +1370,8 @@ func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]s result := make(map[string][]string, len(parsedLabels)) for lbl, values := range parsedLabels { - if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || lbl == logqlmodel.PreserveErrorLabel { + if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || + lbl == logqlmodel.PreserveErrorLabel { continue } vals := make([]string, 0, len(values)) diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 7f5cc24ff276..6a5a45b5123d 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/pkg/push" @@ -16,6 +17,7 @@ import ( "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" + logql_log "github.com/grafana/loki/v3/pkg/logql/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -578,27 +580,41 @@ func mockLogfmtStream(from int, quantity int) logproto.Stream { return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`) } -func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream { +func mockLogfmtStreamWithLabels(_ int, quantity int, lbls string) logproto.Stream { entries := make([]logproto.Entry, 0, quantity) + streamLabels, err := syntax.ParseLabels(lbls) + if err != nil { + streamLabels = labels.EmptyLabels() + } + + lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash()) + logFmtParser := logql_log.NewLogfmtParser(false, false) // used for detected fields queries which are always BACKWARD for i := quantity; i > 0; i-- { - entries = append(entries, logproto.Entry{ + line := fmt.Sprintf( + `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`, + i, + i, + (i * 10), + (i * 256), + float32(i*10.0), + (i%2 == 0)) + + entry := logproto.Entry{ Timestamp: time.Unix(int64(i), 0), - Line: fmt.Sprintf( - `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`, - i, - i, - (i * 10), - (i * 256), - float32(i*10.0), - (i%2 == 0)), - }) + Line: line, + } + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder) + if logfmtSuccess { + entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed()) + } + entries = append(entries, entry) } return logproto.Stream{ Entries: entries, - Labels: labels, + Labels: lblBuilder.LabelsResult().String(), } } @@ -609,7 +625,7 @@ func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Str func mockLogfmtStreamWithLabelsAndStructuredMetadata( from int, quantity int, - labels string, + lbls string, ) logproto.Stream { var entries []logproto.Entry metadata := push.LabelsAdapter{ @@ -626,15 +642,29 @@ func mockLogfmtStreamWithLabelsAndStructuredMetadata( }) } + streamLabels, err := syntax.ParseLabels(lbls) + if err != nil { + streamLabels = labels.EmptyLabels() + } + + lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash()) + logFmtParser := logql_log.NewLogfmtParser(false, false) + for i := quantity; i > 0; i-- { - entries = append(entries, logproto.Entry{ + line := fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i) + entry := logproto.Entry{ Timestamp: time.Unix(int64(i), 0), - Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i), + Line: line, StructuredMetadata: metadata, - }) + } + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder) + if logfmtSuccess { + entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed()) + } + entries = append(entries, entry) } return logproto.Stream{ - Labels: labels, + Labels: lbls, Entries: entries, } } From b353d326f476fd900a9a6ba3c45282b9a57cc34f Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 6 Sep 2024 12:52:13 -0600 Subject: [PATCH 8/8] ci: lint --- pkg/querier/querier_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 2f2e1d9b81bc..d7dbaa61e564 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -21,9 +21,10 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/grafana/loki/pkg/push" util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" @@ -2146,7 +2147,7 @@ func Test_parseDetectedFeilds(t *testing.T) { nginxStream := push.Stream{ Labels: nginxLbls, - Entries: nginxJsonLines, + Entries: nginxJSONLines, Hash: nginxMetric.Hash(), } @@ -2272,7 +2273,7 @@ func Test_parseDetectedFeilds(t *testing.T) { nginxStream := push.Stream{ Labels: nginxLbls, - Entries: nginxJsonLines, + Entries: nginxJSONLines, Hash: nginxMetric.Hash(), } @@ -2433,7 +2434,7 @@ func Test_parseDetectedFeilds(t *testing.T) { } } - nginxJsonLines := []push.Entry{ + nginxJSONLines := []push.Entry{ { Timestamp: now, Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, @@ -2460,7 +2461,7 @@ func Test_parseDetectedFeilds(t *testing.T) { nginxStream := push.Stream{ Labels: nginxLbls, - Entries: nginxJsonLines, + Entries: nginxJSONLines, Hash: nginxMetric.Hash(), } @@ -2586,7 +2587,7 @@ func Test_parseDetectedFeilds(t *testing.T) { nginxStream := push.Stream{ Labels: nginxLbls, - Entries: nginxJsonLines, + Entries: nginxJSONLines, Hash: nginxMetric.Hash(), }