diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index d594c8bbb289..3c3e07b64dae 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -53,6 +53,14 @@ func (Streams) String() string { return "" } +func (streams Streams) lines() int64 { + var res int64 + for _, s := range streams { + res += int64(len(s.Entries)) + } + return res +} + // Result is the result of a query execution. type Result struct { Data promql_parser.Value @@ -159,7 +167,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) { } if q.record { - RecordMetrics(ctx, q.params, status, statResult) + RecordMetrics(ctx, q.params, status, statResult, data) } return Result{ @@ -293,7 +301,6 @@ func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql_parser } return PopulateMatrixFromScalar(s, q.params), nil - } func PopulateMatrixFromScalar(data promql.Scalar, params Params) promql.Matrix { diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index d992691aeb7d..cf4b0cd4fbda 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/logql/stats" ) @@ -64,21 +65,28 @@ var ( }) ) -func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) { - logger := util_log.WithContext(ctx, util_log.Logger) +func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result, result promql_parser.Value) { + var ( + logger = util_log.WithContext(ctx, util_log.Logger) + rt = string(GetRangeType(p)) + latencyType = latencyTypeFast + returnedLines = 0 + ) queryType, err := QueryType(p.Query()) if err != nil { level.Warn(logger).Log("msg", "error parsing query type", "err", err) } - rt := string(GetRangeType(p)) // Tag throughput metric by latency type based on a threshold. // Latency below the threshold is fast, above is slow. - latencyType := latencyTypeFast if stats.Summary.ExecTime > slowQueryThresholdSecond { latencyType = latencyTypeSlow } + if result != nil && result.Type() == ValueTypeStreams { + returnedLines = int(result.(Streams).lines()) + } + // we also log queries, useful for troubleshooting slow queries. level.Info(logger).Log( "latency", latencyType, // this can be used to filter log lines. @@ -89,6 +97,8 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res "step", p.Step(), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, + "limit", p.Limit(), + "returned_lines", returnedLines, "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), ) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index dd4d42d11a69..f920005687de 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -73,10 +73,10 @@ func TestLogSlowQuery(t *testing.T) { ExecTime: 25.25, TotalBytesProcessed: 100000, }, - }) + }, Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}}) require.Equal(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 throughput=100kB total_bytes=100kB\n", + "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 85538adf019b..83db56fdb52f 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" + promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/middleware" "github.com/grafana/loki/pkg/logql" @@ -23,27 +24,31 @@ type ctxKeyType string const ctxKey ctxKeyType = "stats" var ( - defaultMetricRecorder = metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) { - logql.RecordMetrics(ctx, p, status, stats) + defaultMetricRecorder = metricRecorderFn(func(data *queryData) { + logql.RecordMetrics(data.ctx, data.params, data.status, *data.statistics, data.result) }) // StatsHTTPMiddleware is an http middleware to record stats for query_range filter. StatsHTTPMiddleware middleware.Interface = statsHTTPMiddleware(defaultMetricRecorder) ) type metricRecorder interface { - Record(ctx context.Context, p logql.Params, status string, stats stats.Result) + Record(data *queryData) } -type metricRecorderFn func(ctx context.Context, p logql.Params, status string, stats stats.Result) +type metricRecorderFn func(data *queryData) -func (m metricRecorderFn) Record(ctx context.Context, p logql.Params, status string, stats stats.Result) { - m(ctx, p, status, stats) +func (m metricRecorderFn) Record(data *queryData) { + m(data) } type queryData struct { + ctx context.Context params logql.Params statistics *stats.Result - recorded bool + result promql_parser.Value + status string + + recorded bool } func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface { @@ -62,12 +67,9 @@ func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface { if data.statistics == nil { data.statistics = &stats.Result{} } - recorder.Record( - r.Context(), - data.params, - strconv.Itoa(interceptor.statusCode), - *data.statistics, - ) + data.ctx = r.Context() + data.status = strconv.Itoa(interceptor.statusCode) + recorder.Record(data) } }) }) @@ -85,10 +87,12 @@ func StatsCollectorMiddleware() queryrange.Middleware { // collect stats and status var statistics *stats.Result + var res promql_parser.Value if resp != nil { switch r := resp.(type) { case *LokiResponse: statistics = &r.Statistics + res = logql.Streams(r.Data.Result) case *LokiPromResponse: statistics = &r.Statistics default: @@ -105,6 +109,7 @@ func StatsCollectorMiddleware() queryrange.Middleware { data.recorded = true data.statistics = statistics data.params = paramsFromRequest(req) + data.result = res } return resp, err }) diff --git a/pkg/querier/queryrange/stats_test.go b/pkg/querier/queryrange/stats_test.go index 58af159fae38..b7f0ddb64b4c 100644 --- a/pkg/querier/queryrange/stats_test.go +++ b/pkg/querier/queryrange/stats_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" ) @@ -70,7 +69,7 @@ func Test_StatsHTTP(t *testing.T) { for _, test := range []struct { name string next http.Handler - expect func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result) + expect func(t *testing.T, data *queryData) }{ { "should not record metric if nothing is recorded", @@ -78,7 +77,7 @@ func Test_StatsHTTP(t *testing.T) { data := r.Context().Value(ctxKey).(*queryData) data.recorded = false }), - func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result) { + func(t *testing.T, data *queryData) { t.Fail() }, }, @@ -94,12 +93,12 @@ func Test_StatsHTTP(t *testing.T) { }) data.statistics = nil }), - func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) { - require.Equal(t, fmt.Sprintf("%d", http.StatusOK), status) - require.Equal(t, "foo", p.Query()) - require.Equal(t, logproto.BACKWARD, p.Direction()) - require.Equal(t, uint32(100), p.Limit()) - require.Equal(t, stats.Result{}, s) + func(t *testing.T, data *queryData) { + require.Equal(t, fmt.Sprintf("%d", http.StatusOK), data.status) + require.Equal(t, "foo", data.params.Query()) + require.Equal(t, logproto.BACKWARD, data.params.Direction()) + require.Equal(t, uint32(100), data.params.Limit()) + require.Equal(t, stats.Result{}, *data.statistics) }, }, { @@ -115,18 +114,41 @@ func Test_StatsHTTP(t *testing.T) { data.statistics = &statsResult w.WriteHeader(http.StatusTeapot) }), - func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) { - require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), status) - require.Equal(t, "foo", p.Query()) - require.Equal(t, logproto.BACKWARD, p.Direction()) - require.Equal(t, uint32(100), p.Limit()) - require.Equal(t, statsResult, s) + func(t *testing.T, data *queryData) { + require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), data.status) + require.Equal(t, "foo", data.params.Query()) + require.Equal(t, logproto.BACKWARD, data.params.Direction()) + require.Equal(t, uint32(100), data.params.Limit()) + require.Equal(t, statsResult, *data.statistics) + }, + }, + { + "result", + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data := r.Context().Value(ctxKey).(*queryData) + data.recorded = true + data.params = paramsFromRequest(&LokiRequest{ + Query: "foo", + Direction: logproto.BACKWARD, + Limit: 100, + }) + data.statistics = &statsResult + data.result = streams + w.WriteHeader(http.StatusTeapot) + }), + func(t *testing.T, data *queryData) { + require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), data.status) + require.Equal(t, "foo", data.params.Query()) + require.Equal(t, logproto.BACKWARD, data.params.Direction()) + require.Equal(t, uint32(100), data.params.Limit()) + require.Equal(t, statsResult, *data.statistics) + require.Equal(t, streams, data.result) }, }, } { t.Run(test.name, func(t *testing.T) { - statsHTTPMiddleware(metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) { - test.expect(t, ctx, p, status, stats) + statsHTTPMiddleware(metricRecorderFn(func(data *queryData) { + test.expect(t, data) })).Wrap(test.next).ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "/foo", strings.NewReader(""))) }) }