diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index cb8086564752..2046763e6973 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1102,9 +1102,8 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. if err != nil { return nil, err } - parsers := getParsersFromExpr(expr) - detectedFields := parseDetectedFields(req.FieldLimit, streams, parsers) + detectedFields := parseDetectedFields(req.FieldLimit, streams) fields := make([]*logproto.DetectedField, len(detectedFields)) fieldCount := 0 @@ -1220,7 +1219,7 @@ func determineType(value string) logproto.DetectedFieldType { return logproto.DetectedFieldString } -func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers []string) map[string]*parsedFields { +func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields { detectedFields := make(map[string]*parsedFields, limit) fieldCount := uint32(0) emtpyparsers := []string{} @@ -1258,11 +1257,8 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers } } - parsers := queryParsers - parsedLabels := getParsedLabels(entry) - if len(parsedLabels) == 0 { - parsedLabels, parsers = parseLine(entry.Line, streamLbls) - } + streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash()) + parsedLabels, parsers := parseEntry(entry, streamLbls) for k, vals := range parsedLabels { df, ok := detectedFields[k] if !ok && fieldCount < limit { @@ -1299,9 +1295,9 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers return detectedFields } -func getParsedLabels(entry push.Entry) map[string][]string { +func getStructuredMetadata(entry push.Entry) map[string][]string { labels := map[string]map[string]struct{}{} - for _, lbl := range entry.Parsed { + for _, lbl := range entry.StructuredMetadata { if values, ok := labels[lbl.Name]; ok { values[lbl.Value] = struct{}{} } else { @@ -1321,50 +1317,50 @@ func getParsedLabels(entry push.Entry) map[string][]string { return result } -func getStructuredMetadata(entry push.Entry) map[string][]string { - labels := map[string]map[string]struct{}{} - for _, lbl := range entry.StructuredMetadata { - if values, ok := labels[lbl.Name]; ok { - values[lbl.Value] = struct{}{} - } else { - labels[lbl.Name] = map[string]struct{}{lbl.Value: {}} - } - } +func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) { + origParsed := getParsedLabels(entry) + parsed := make(map[string][]string, len(origParsed)) - result := make(map[string][]string, len(labels)) - for lbl, values := range labels { - vals := make([]string, 0, len(values)) - for v := range values { - vals = append(vals, v) + for lbl, values := range origParsed { + if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || + lbl == logqlmodel.PreserveErrorLabel { + continue } - result[lbl] = vals - } - - return result -} -func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []string) { - parser := "logfmt" - logFmtParser := logql_log.NewLogfmtParser(true, false) + parsed[lbl] = values + } - lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0) - _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) - if !logfmtSuccess || lbls.HasErr() { - parser = "json" - jsonParser := logql_log.NewJSONParser() + line := entry.Line + parser := "json" + jsonParser := logql_log.NewJSONParser() + _, jsonSuccess := jsonParser.Process(0, []byte(line), lbls) + if !jsonSuccess || lbls.HasErr() { lbls.Reset() - _, jsonSuccess := jsonParser.Process(0, []byte(line), lbls) - if !jsonSuccess || lbls.HasErr() { - return map[string][]string{}, nil + + logFmtParser := logql_log.NewLogfmtParser(false, false) + parser = "logfmt" + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) + if !logfmtSuccess || lbls.HasErr() { + return parsed, nil } } parsedLabels := map[string]map[string]struct{}{} - for _, lbl := range lbls.LabelsResult().Labels() { - // skip indexed labels, as we only want detected fields - if streamLbls.Has(lbl.Name) { - continue + for lbl, values := range parsed { + if vals, ok := parsedLabels[lbl]; ok { + for _, value := range values { + vals[value] = struct{}{} + } + } else { + parsedLabels[lbl] = map[string]struct{}{} + for _, value := range values { + parsedLabels[lbl][value] = struct{}{} + } } + } + + lblsResult := lbls.LabelsResult().Parsed() + for _, lbl := range lblsResult { if values, ok := parsedLabels[lbl.Name]; ok { values[lbl.Value] = struct{}{} } else { @@ -1374,6 +1370,10 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st result := make(map[string][]string, len(parsedLabels)) for lbl, values := range parsedLabels { + if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || + lbl == logqlmodel.PreserveErrorLabel { + continue + } vals := make([]string, 0, len(values)) for v := range values { vals = append(vals, v) @@ -1384,10 +1384,29 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st return result, []string{parser} } +func getParsedLabels(entry push.Entry) map[string][]string { + labels := map[string]map[string]struct{}{} + for _, lbl := range entry.Parsed { + if values, ok := labels[lbl.Name]; ok { + values[lbl.Value] = struct{}{} + } else { + labels[lbl.Name] = map[string]struct{}{lbl.Value: {}} + } + } + + result := make(map[string][]string, len(labels)) + for lbl, values := range labels { + vals := make([]string, 0, len(values)) + for v := range values { + vals = append(vals, v) + } + result[lbl] = vals + } + + return result +} + // streamsForFieldDetection reads the streams from the iterator and returns them sorted. -// If categorizeLabels is true, the stream labels contains just the stream labels and entries inside each stream have their -// structuredMetadata and parsed fields populated with structured metadata labels plus the parsed labels respectively. -// Otherwise, the stream labels are the whole series labels including the stream labels, structured metadata labels and parsed labels. func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Streams, error) { streams := map[string]*logproto.Stream{} respSize := uint32(0) @@ -1403,12 +1422,28 @@ func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Str // If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line. // Then check to see if the entry is equal to, or past a forward step if lastEntry.Unix() < 0 || shouldOutput { - stream, ok := streams[streamLabels] + allLbls, err := syntax.ParseLabels(streamLabels) + if err != nil { + continue + } + + parsedLbls := logproto.FromLabelAdaptersToLabels(entry.Parsed) + structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata) + + onlyStreamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(allLbls, 0) + allLbls.Range(func(l labels.Label) { + if parsedLbls.Has(l.Name) || structuredMetadata.Has(l.Name) { + onlyStreamLbls.Del(l.Name) + } + }) + + lblStr := onlyStreamLbls.LabelsResult().String() + stream, ok := streams[lblStr] if !ok { stream = &logproto.Stream{ - Labels: streamLabels, + Labels: lblStr, } - streams[streamLabels] = stream + streams[lblStr] = stream } stream.Entries = append(stream.Entries, entry) lastEntry = i.At().Timestamp diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 7f5cc24ff276..6a5a45b5123d 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/pkg/push" @@ -16,6 +17,7 @@ import ( "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" + logql_log "github.com/grafana/loki/v3/pkg/logql/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -578,27 +580,41 @@ func mockLogfmtStream(from int, quantity int) logproto.Stream { return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`) } -func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream { +func mockLogfmtStreamWithLabels(_ int, quantity int, lbls string) logproto.Stream { entries := make([]logproto.Entry, 0, quantity) + streamLabels, err := syntax.ParseLabels(lbls) + if err != nil { + streamLabels = labels.EmptyLabels() + } + + lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash()) + logFmtParser := logql_log.NewLogfmtParser(false, false) // used for detected fields queries which are always BACKWARD for i := quantity; i > 0; i-- { - entries = append(entries, logproto.Entry{ + line := fmt.Sprintf( + `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`, + i, + i, + (i * 10), + (i * 256), + float32(i*10.0), + (i%2 == 0)) + + entry := logproto.Entry{ Timestamp: time.Unix(int64(i), 0), - Line: fmt.Sprintf( - `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`, - i, - i, - (i * 10), - (i * 256), - float32(i*10.0), - (i%2 == 0)), - }) + Line: line, + } + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder) + if logfmtSuccess { + entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed()) + } + entries = append(entries, entry) } return logproto.Stream{ Entries: entries, - Labels: labels, + Labels: lblBuilder.LabelsResult().String(), } } @@ -609,7 +625,7 @@ func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Str func mockLogfmtStreamWithLabelsAndStructuredMetadata( from int, quantity int, - labels string, + lbls string, ) logproto.Stream { var entries []logproto.Entry metadata := push.LabelsAdapter{ @@ -626,15 +642,29 @@ func mockLogfmtStreamWithLabelsAndStructuredMetadata( }) } + streamLabels, err := syntax.ParseLabels(lbls) + if err != nil { + streamLabels = labels.EmptyLabels() + } + + lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash()) + logFmtParser := logql_log.NewLogfmtParser(false, false) + for i := quantity; i > 0; i-- { - entries = append(entries, logproto.Entry{ + line := fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i) + entry := logproto.Entry{ Timestamp: time.Unix(int64(i), 0), - Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i), + Line: line, StructuredMetadata: metadata, - }) + } + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder) + if logfmtSuccess { + entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed()) + } + entries = append(entries, entry) } return logproto.Stream{ - Labels: labels, + Labels: lbls, Entries: entries, } } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index e542b28247cd..d7dbaa61e564 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -16,17 +16,21 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/user" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage" "github.com/grafana/loki/v3/pkg/util/constants" @@ -2096,3 +2100,571 @@ func Test_getParsersFromExpr(t *testing.T) { assert.Equal(t, []string{"logfmt", "json"}, getParsersFromExpr(expr)) }) } + +func Test_parseDetectedFeilds(t *testing.T) { + now := time.Now() + + t.Run("when no parsers are supplied", func(t *testing.T) { + infoDetectdFiledMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "info", + }, + } + + rulerLines := []push.Entry{ + {Timestamp: now, Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, + {Timestamp: now, Line: "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, + {Timestamp: now, Line: "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, + } + + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + debugDetectedFieldMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + } + + nginxJSONLines := []push.Entry{ + {Timestamp: now, Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, StructuredMetadata: debugDetectedFieldMetadata}, + {Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, StructuredMetadata: debugDetectedFieldMetadata}, + {Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, StructuredMetadata: debugDetectedFieldMetadata}, + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJSONLines, + Hash: nginxMetric.Hash(), + } + + t.Run("detect logfmt fields when with no supplied parsers", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect json fields when with no supplied parsers", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream})) + for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "json", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect mixed fields when with no supplied parsers", func(t *testing.T) { + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) + + for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected) + require.Equal(t, "logfmt", parsers[0], "expected only logfmt parser for %s", expected) + } + + for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for a single stream", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJSONLines, + Hash: nginxMetric.Hash(), + } + + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + }) + + t.Run("when parsers are supplied", func(t *testing.T) { + infoDetectdFiledMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "info", + }, + } + + parsedRulerFields := func(ts, tenant, duration string) []push.LabelAdapter { + return []push.LabelAdapter{ + { + Name: "ts", + Value: ts, + }, + { + Name: "caller", + Value: "grpc_logging.go:66", + }, + { + Name: "tenant", + Value: tenant, + }, + { + Name: "level", + Value: "info", + }, + { + Name: "method", + Value: "/cortex.Ingester/Push", + }, + { + Name: "duration", + Value: duration, + }, + { + Name: "msg", + Value: "gRPC", + }, + } + } + + rulerLines := []push.Entry{ + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: parsedRulerFields("2024-09-05T15:36:38.757788067Z", "2419", "19.098s"), + }, + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: parsedRulerFields("2024-09-05T15:36:38.698375619Z", "29", "5.471s"), + }, + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: parsedRulerFields("2024-09-05T15:36:38.629424175Z", "2919", "29.234s"), + }, + } + + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + debugDetectedFieldMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + } + + parsedNginxFields := func(host, userIdentifier, datetime, method, request, protocol, status, bytes, referer string) []push.LabelAdapter { + return []push.LabelAdapter{ + { + Name: "host", + Value: host, + }, + { + Name: "user_identifier", + Value: userIdentifier, + }, + { + Name: "datetime", + Value: datetime, + }, + { + Name: "method", + Value: method, + }, + { + Name: "request", + Value: request, + }, + { + Name: "protocol", + Value: protocol, + }, + { + Name: "status", + Value: status, + }, + { + Name: "bytes", + Value: bytes, + }, + { + Name: "referer", + Value: referer, + }, + } + } + + nginxJSONLines := []push.Entry{ + { + Timestamp: now, + Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, + StructuredMetadata: debugDetectedFieldMetadata, + Parsed: parsedNginxFields("100.117.38.203", "nadre3722", "05/Sep/2024:16:13:56 +0000", "PATCH", "/api/loki/v1/push", "HTTP/2.0", "200", "9664", "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"), + }, + { + Timestamp: now, + Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, + StructuredMetadata: debugDetectedFieldMetadata, + Parsed: parsedNginxFields("66.134.9.30", "-", "05/Sep/2024:16:13:55 +0000", "DELETE", "/api/mimir/v1/push", "HTTP/1.1", "200", "18688", "https://www.districtiterate.biz/synergistic/next-generation/extend"), + }, + { + Timestamp: now, + Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, + StructuredMetadata: debugDetectedFieldMetadata, + Parsed: parsedNginxFields("66.134.9.30", "-", "05/Sep/2024:16:13:55 +0000", "GET", "/api/loki/v1/label/names", "HTTP/1.1", "200", "9314", "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"), + }, + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJSONLines, + Hash: nginxMetric.Hash(), + } + + t.Run("detect logfmt fields", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect json fields", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream})) + for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "json", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect mixed fields", func(t *testing.T) { + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) + + for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected) + require.Equal(t, "logfmt", parsers[0], "expected only logfmt parser for %s", expected) + } + + for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for a single stream", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJSONLines, + Hash: nginxMetric.Hash(), + } + + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + }) + + t.Run("handles level in all the places", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house", level="debug"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: []push.Entry{ + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", + StructuredMetadata: []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + }, + Parsed: []push.LabelAdapter{ + { + Name: "level", + Value: "info", + }, + }, + }, + }, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, rulerStream})) + + detectedLevelField := df["detected_level"] + require.Len(t, detectedLevelField.parsers, 0) + require.Equal(t, uint64(1), detectedLevelField.sketch.Estimate()) + + levelField := df["level_extracted"] + require.Len(t, levelField.parsers, 1) + require.Contains(t, levelField.parsers, "logfmt") + require.Equal(t, uint64(1), levelField.sketch.Estimate()) + }) +}