Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correct _extracted logic in detected fields #14064

Merged
merged 9 commits into from
Sep 6, 2024
135 changes: 85 additions & 50 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,9 +1102,8 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
if err != nil {
return nil, err
}
parsers := getParsersFromExpr(expr)

detectedFields := parseDetectedFields(req.FieldLimit, streams, parsers)
detectedFields := parseDetectedFields(req.FieldLimit, streams)

fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
Expand Down Expand Up @@ -1220,7 +1219,7 @@ func determineType(value string) logproto.DetectedFieldType {
return logproto.DetectedFieldString
}

func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers []string) map[string]*parsedFields {
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
detectedFields := make(map[string]*parsedFields, limit)
fieldCount := uint32(0)
emtpyparsers := []string{}
Expand Down Expand Up @@ -1258,11 +1257,8 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers
}
}

parsers := queryParsers
parsedLabels := getParsedLabels(entry)
if len(parsedLabels) == 0 {
parsedLabels, parsers = parseLine(entry.Line, streamLbls)
}
streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash())
parsedLabels, parsers := parseEntry(entry, streamLbls)
for k, vals := range parsedLabels {
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
Expand Down Expand Up @@ -1299,9 +1295,9 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers
return detectedFields
}

func getParsedLabels(entry push.Entry) map[string][]string {
func getStructuredMetadata(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.Parsed {
for _, lbl := range entry.StructuredMetadata {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
Expand All @@ -1321,50 +1317,50 @@ func getParsedLabels(entry push.Entry) map[string][]string {
return result
}

func getStructuredMetadata(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.StructuredMetadata {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
}
}
func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) {
origParsed := getParsedLabels(entry)
parsed := make(map[string][]string, len(origParsed))

result := make(map[string][]string, len(labels))
for lbl, values := range labels {
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
for lbl, values := range origParsed {
if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel ||
lbl == logqlmodel.PreserveErrorLabel {
continue
}
result[lbl] = vals
}

return result
}

func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []string) {
parser := "logfmt"
logFmtParser := logql_log.NewLogfmtParser(true, false)
parsed[lbl] = values
}

lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0)
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
if !logfmtSuccess || lbls.HasErr() {
parser = "json"
jsonParser := logql_log.NewJSONParser()
line := entry.Line
parser := "json"
jsonParser := logql_log.NewJSONParser()
_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
if !jsonSuccess || lbls.HasErr() {
lbls.Reset()
_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
if !jsonSuccess || lbls.HasErr() {
return map[string][]string{}, nil

logFmtParser := logql_log.NewLogfmtParser(false, false)
parser = "logfmt"
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
if !logfmtSuccess || lbls.HasErr() {
return parsed, nil
}
}

parsedLabels := map[string]map[string]struct{}{}
for _, lbl := range lbls.LabelsResult().Labels() {
// skip indexed labels, as we only want detected fields
if streamLbls.Has(lbl.Name) {
continue
for lbl, values := range parsed {
if vals, ok := parsedLabels[lbl]; ok {
for _, value := range values {
vals[value] = struct{}{}
}
} else {
parsedLabels[lbl] = map[string]struct{}{}
for _, value := range values {
parsedLabels[lbl][value] = struct{}{}
}
}
}

lblsResult := lbls.LabelsResult().Parsed()
for _, lbl := range lblsResult {
if values, ok := parsedLabels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
Expand All @@ -1374,6 +1370,10 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st

result := make(map[string][]string, len(parsedLabels))
for lbl, values := range parsedLabels {
if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel ||
lbl == logqlmodel.PreserveErrorLabel {
continue
}
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
Expand All @@ -1384,10 +1384,29 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st
return result, []string{parser}
}

func getParsedLabels(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.Parsed {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
}
}

result := make(map[string][]string, len(labels))
for lbl, values := range labels {
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
}
result[lbl] = vals
}

return result
}

// streamsForFieldDetection reads the streams from the iterator and returns them sorted.
// If categorizeLabels is true, the stream labels contains just the stream labels and entries inside each stream have their
// structuredMetadata and parsed fields populated with structured metadata labels plus the parsed labels respectively.
// Otherwise, the stream labels are the whole series labels including the stream labels, structured metadata labels and parsed labels.
func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Streams, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
Expand All @@ -1403,12 +1422,28 @@ func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Str
// If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line.
// Then check to see if the entry is equal to, or past a forward step
if lastEntry.Unix() < 0 || shouldOutput {
stream, ok := streams[streamLabels]
allLbls, err := syntax.ParseLabels(streamLabels)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic added here is to extract the stream labels from the additional labels via parsing and structured metadata. this likely needs a test

if err != nil {
continue
}

parsedLbls := logproto.FromLabelAdaptersToLabels(entry.Parsed)
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)

onlyStreamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(allLbls, 0)
allLbls.Range(func(l labels.Label) {
if parsedLbls.Has(l.Name) || structuredMetadata.Has(l.Name) {
onlyStreamLbls.Del(l.Name)
}
})

lblStr := onlyStreamLbls.LabelsResult().String()
stream, ok := streams[lblStr]
if !ok {
stream = &logproto.Stream{
Labels: streamLabels,
Labels: lblStr,
}
streams[streamLabels] = stream
streams[lblStr] = stream
}
stream.Entries = append(stream.Entries, entry)
lastEntry = i.At().Timestamp
Expand Down
64 changes: 47 additions & 17 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"

"github.com/grafana/loki/pkg/push"

Expand All @@ -16,6 +17,7 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
logql_log "github.com/grafana/loki/v3/pkg/logql/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -578,27 +580,41 @@ func mockLogfmtStream(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`)
}

func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
func mockLogfmtStreamWithLabels(_ int, quantity int, lbls string) logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)
streamLabels, err := syntax.ParseLabels(lbls)
if err != nil {
streamLabels = labels.EmptyLabels()
}

lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash())
logFmtParser := logql_log.NewLogfmtParser(false, false)

// used for detected fields queries which are always BACKWARD
for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
line := fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
i,
i,
(i * 10),
(i * 256),
float32(i*10.0),
(i%2 == 0))

entry := logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
i,
i,
(i * 10),
(i * 256),
float32(i*10.0),
(i%2 == 0)),
})
Line: line,
}
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder)
if logfmtSuccess {
entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed())
}
entries = append(entries, entry)
}

return logproto.Stream{
Entries: entries,
Labels: labels,
Labels: lblBuilder.LabelsResult().String(),
}
}

Expand All @@ -609,7 +625,7 @@ func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Str
func mockLogfmtStreamWithLabelsAndStructuredMetadata(
from int,
quantity int,
labels string,
lbls string,
) logproto.Stream {
var entries []logproto.Entry
metadata := push.LabelsAdapter{
Expand All @@ -626,15 +642,29 @@ func mockLogfmtStreamWithLabelsAndStructuredMetadata(
})
}

streamLabels, err := syntax.ParseLabels(lbls)
if err != nil {
streamLabels = labels.EmptyLabels()
}

lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash())
logFmtParser := logql_log.NewLogfmtParser(false, false)

for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
line := fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i)
entry := logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i),
Line: line,
StructuredMetadata: metadata,
})
}
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder)
if logfmtSuccess {
entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed())
}
entries = append(entries, entry)
}
return logproto.Stream{
Labels: labels,
Labels: lbls,
Entries: entries,
}
}
Expand Down
Loading
Loading