Skip to content

Commit

Permalink
fix: correct _extracted logic in detected fields (#14064)
Browse files Browse the repository at this point in the history
Co-authored-by: Sven Grossmann <svennergr@gmail.com>
  • Loading branch information
trevorwhitney and svennergr authored Sep 6, 2024
1 parent 18cef21 commit 1b3ba53
Show file tree
Hide file tree
Showing 3 changed files with 704 additions and 67 deletions.
135 changes: 85 additions & 50 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,9 +1102,8 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
if err != nil {
return nil, err
}
parsers := getParsersFromExpr(expr)

detectedFields := parseDetectedFields(req.FieldLimit, streams, parsers)
detectedFields := parseDetectedFields(req.FieldLimit, streams)

fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
Expand Down Expand Up @@ -1220,7 +1219,7 @@ func determineType(value string) logproto.DetectedFieldType {
return logproto.DetectedFieldString
}

func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers []string) map[string]*parsedFields {
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
detectedFields := make(map[string]*parsedFields, limit)
fieldCount := uint32(0)
emtpyparsers := []string{}
Expand Down Expand Up @@ -1258,11 +1257,8 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers
}
}

parsers := queryParsers
parsedLabels := getParsedLabels(entry)
if len(parsedLabels) == 0 {
parsedLabels, parsers = parseLine(entry.Line, streamLbls)
}
streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash())
parsedLabels, parsers := parseEntry(entry, streamLbls)
for k, vals := range parsedLabels {
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
Expand Down Expand Up @@ -1299,9 +1295,9 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers
return detectedFields
}

func getParsedLabels(entry push.Entry) map[string][]string {
func getStructuredMetadata(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.Parsed {
for _, lbl := range entry.StructuredMetadata {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
Expand All @@ -1321,50 +1317,50 @@ func getParsedLabels(entry push.Entry) map[string][]string {
return result
}

func getStructuredMetadata(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.StructuredMetadata {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
}
}
func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) {
origParsed := getParsedLabels(entry)
parsed := make(map[string][]string, len(origParsed))

result := make(map[string][]string, len(labels))
for lbl, values := range labels {
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
for lbl, values := range origParsed {
if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel ||
lbl == logqlmodel.PreserveErrorLabel {
continue
}
result[lbl] = vals
}

return result
}

func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []string) {
parser := "logfmt"
logFmtParser := logql_log.NewLogfmtParser(true, false)
parsed[lbl] = values
}

lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0)
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
if !logfmtSuccess || lbls.HasErr() {
parser = "json"
jsonParser := logql_log.NewJSONParser()
line := entry.Line
parser := "json"
jsonParser := logql_log.NewJSONParser()
_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
if !jsonSuccess || lbls.HasErr() {
lbls.Reset()
_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
if !jsonSuccess || lbls.HasErr() {
return map[string][]string{}, nil

logFmtParser := logql_log.NewLogfmtParser(false, false)
parser = "logfmt"
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
if !logfmtSuccess || lbls.HasErr() {
return parsed, nil
}
}

parsedLabels := map[string]map[string]struct{}{}
for _, lbl := range lbls.LabelsResult().Labels() {
// skip indexed labels, as we only want detected fields
if streamLbls.Has(lbl.Name) {
continue
for lbl, values := range parsed {
if vals, ok := parsedLabels[lbl]; ok {
for _, value := range values {
vals[value] = struct{}{}
}
} else {
parsedLabels[lbl] = map[string]struct{}{}
for _, value := range values {
parsedLabels[lbl][value] = struct{}{}
}
}
}

lblsResult := lbls.LabelsResult().Parsed()
for _, lbl := range lblsResult {
if values, ok := parsedLabels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
Expand All @@ -1374,6 +1370,10 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st

result := make(map[string][]string, len(parsedLabels))
for lbl, values := range parsedLabels {
if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel ||
lbl == logqlmodel.PreserveErrorLabel {
continue
}
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
Expand All @@ -1384,10 +1384,29 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st
return result, []string{parser}
}

func getParsedLabels(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.Parsed {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
}
}

result := make(map[string][]string, len(labels))
for lbl, values := range labels {
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
}
result[lbl] = vals
}

return result
}

// streamsForFieldDetection reads the streams from the iterator and returns them sorted.
// If categorizeLabels is true, the stream labels contains just the stream labels and entries inside each stream have their
// structuredMetadata and parsed fields populated with structured metadata labels plus the parsed labels respectively.
// Otherwise, the stream labels are the whole series labels including the stream labels, structured metadata labels and parsed labels.
func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Streams, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
Expand All @@ -1403,12 +1422,28 @@ func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Str
// If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line.
// Then check to see if the entry is equal to, or past a forward step
if lastEntry.Unix() < 0 || shouldOutput {
stream, ok := streams[streamLabels]
allLbls, err := syntax.ParseLabels(streamLabels)
if err != nil {
continue
}

parsedLbls := logproto.FromLabelAdaptersToLabels(entry.Parsed)
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)

onlyStreamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(allLbls, 0)
allLbls.Range(func(l labels.Label) {
if parsedLbls.Has(l.Name) || structuredMetadata.Has(l.Name) {
onlyStreamLbls.Del(l.Name)
}
})

lblStr := onlyStreamLbls.LabelsResult().String()
stream, ok := streams[lblStr]
if !ok {
stream = &logproto.Stream{
Labels: streamLabels,
Labels: lblStr,
}
streams[streamLabels] = stream
streams[lblStr] = stream
}
stream.Entries = append(stream.Entries, entry)
lastEntry = i.At().Timestamp
Expand Down
64 changes: 47 additions & 17 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"

"github.com/grafana/loki/pkg/push"

Expand All @@ -16,6 +17,7 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
logql_log "github.com/grafana/loki/v3/pkg/logql/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -578,27 +580,41 @@ func mockLogfmtStream(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`)
}

func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
func mockLogfmtStreamWithLabels(_ int, quantity int, lbls string) logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)
streamLabels, err := syntax.ParseLabels(lbls)
if err != nil {
streamLabels = labels.EmptyLabels()
}

lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash())
logFmtParser := logql_log.NewLogfmtParser(false, false)

// used for detected fields queries which are always BACKWARD
for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
line := fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
i,
i,
(i * 10),
(i * 256),
float32(i*10.0),
(i%2 == 0))

entry := logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
i,
i,
(i * 10),
(i * 256),
float32(i*10.0),
(i%2 == 0)),
})
Line: line,
}
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder)
if logfmtSuccess {
entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed())
}
entries = append(entries, entry)
}

return logproto.Stream{
Entries: entries,
Labels: labels,
Labels: lblBuilder.LabelsResult().String(),
}
}

Expand All @@ -609,7 +625,7 @@ func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Str
func mockLogfmtStreamWithLabelsAndStructuredMetadata(
from int,
quantity int,
labels string,
lbls string,
) logproto.Stream {
var entries []logproto.Entry
metadata := push.LabelsAdapter{
Expand All @@ -626,15 +642,29 @@ func mockLogfmtStreamWithLabelsAndStructuredMetadata(
})
}

streamLabels, err := syntax.ParseLabels(lbls)
if err != nil {
streamLabels = labels.EmptyLabels()
}

lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash())
logFmtParser := logql_log.NewLogfmtParser(false, false)

for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
line := fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i)
entry := logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i),
Line: line,
StructuredMetadata: metadata,
})
}
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder)
if logfmtSuccess {
entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed())
}
entries = append(entries, entry)
}
return logproto.Stream{
Labels: labels,
Labels: lbls,
Entries: entries,
}
}
Expand Down
Loading

0 comments on commit 1b3ba53

Please sign in to comment.