Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit and line_returned in the query log. #3423

Merged
merged 2 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ func (Streams) String() string {
return ""
}

func (streams Streams) lines() int64 {
var res int64
for _, s := range streams {
res += int64(len(s.Entries))
}
return res
}

// Result is the result of a query execution.
type Result struct {
Data promql_parser.Value
Expand Down Expand Up @@ -159,7 +167,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
}

if q.record {
RecordMetrics(ctx, q.params, status, statResult)
RecordMetrics(ctx, q.params, status, statResult, data)
}

return Result{
Expand Down Expand Up @@ -293,7 +301,6 @@ func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql_parser
}

return PopulateMatrixFromScalar(s, q.params), nil

}

func PopulateMatrixFromScalar(data promql.Scalar, params Params) promql.Matrix {
Expand Down
18 changes: 14 additions & 4 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promql_parser "github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/logql/stats"
)
Expand Down Expand Up @@ -64,21 +65,28 @@ var (
})
)

func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) {
logger := util_log.WithContext(ctx, util_log.Logger)
func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result, result promql_parser.Value) {
var (
logger = util_log.WithContext(ctx, util_log.Logger)
rt = string(GetRangeType(p))
latencyType = latencyTypeFast
returnedLines = 0
)
queryType, err := QueryType(p.Query())
if err != nil {
level.Warn(logger).Log("msg", "error parsing query type", "err", err)
}
rt := string(GetRangeType(p))

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
latencyType := latencyTypeFast
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

if result != nil && result.Type() == ValueTypeStreams {
returnedLines = int(result.(Streams).lines())
}

// we also log queries, useful for troubleshooting slow queries.
level.Info(logger).Log(
"latency", latencyType, // this can be used to filter log lines.
Expand All @@ -89,6 +97,8 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res
"step", p.Step(),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"limit", p.Limit(),
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func TestLogSlowQuery(t *testing.T) {
ExecTime: 25.25,
TotalBytesProcessed: 100000,
},
})
}, Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 throughput=100kB total_bytes=100kB\n",
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
31 changes: 18 additions & 13 deletions pkg/querier/queryrange/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/middleware"

"github.com/grafana/loki/pkg/logql"
Expand All @@ -23,27 +24,31 @@ type ctxKeyType string
const ctxKey ctxKeyType = "stats"

var (
defaultMetricRecorder = metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) {
logql.RecordMetrics(ctx, p, status, stats)
defaultMetricRecorder = metricRecorderFn(func(data *queryData) {
logql.RecordMetrics(data.ctx, data.params, data.status, *data.statistics, data.result)
})
// StatsHTTPMiddleware is an http middleware to record stats for query_range filter.
StatsHTTPMiddleware middleware.Interface = statsHTTPMiddleware(defaultMetricRecorder)
)

type metricRecorder interface {
Record(ctx context.Context, p logql.Params, status string, stats stats.Result)
Record(data *queryData)
}

type metricRecorderFn func(ctx context.Context, p logql.Params, status string, stats stats.Result)
type metricRecorderFn func(data *queryData)

func (m metricRecorderFn) Record(ctx context.Context, p logql.Params, status string, stats stats.Result) {
m(ctx, p, status, stats)
func (m metricRecorderFn) Record(data *queryData) {
m(data)
}

type queryData struct {
ctx context.Context
params logql.Params
statistics *stats.Result
recorded bool
result promql_parser.Value
status string

recorded bool
}

func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
Expand All @@ -62,12 +67,9 @@ func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
if data.statistics == nil {
data.statistics = &stats.Result{}
}
recorder.Record(
r.Context(),
data.params,
strconv.Itoa(interceptor.statusCode),
*data.statistics,
)
data.ctx = r.Context()
data.status = strconv.Itoa(interceptor.statusCode)
recorder.Record(data)
}
})
})
Expand All @@ -85,10 +87,12 @@ func StatsCollectorMiddleware() queryrange.Middleware {

// collect stats and status
var statistics *stats.Result
var res promql_parser.Value
if resp != nil {
switch r := resp.(type) {
case *LokiResponse:
statistics = &r.Statistics
res = logql.Streams(r.Data.Result)
case *LokiPromResponse:
statistics = &r.Statistics
default:
Expand All @@ -105,6 +109,7 @@ func StatsCollectorMiddleware() queryrange.Middleware {
data.recorded = true
data.statistics = statistics
data.params = paramsFromRequest(req)
data.result = res
}
return resp, err
})
Expand Down
56 changes: 39 additions & 17 deletions pkg/querier/queryrange/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)

Expand Down Expand Up @@ -70,15 +69,15 @@ func Test_StatsHTTP(t *testing.T) {
for _, test := range []struct {
name string
next http.Handler
expect func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result)
expect func(t *testing.T, data *queryData)
}{
{
"should not record metric if nothing is recorded",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = false
}),
func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result) {
func(t *testing.T, data *queryData) {
t.Fail()
},
},
Expand All @@ -94,12 +93,12 @@ func Test_StatsHTTP(t *testing.T) {
})
data.statistics = nil
}),
func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) {
require.Equal(t, fmt.Sprintf("%d", http.StatusOK), status)
require.Equal(t, "foo", p.Query())
require.Equal(t, logproto.BACKWARD, p.Direction())
require.Equal(t, uint32(100), p.Limit())
require.Equal(t, stats.Result{}, s)
func(t *testing.T, data *queryData) {
require.Equal(t, fmt.Sprintf("%d", http.StatusOK), data.status)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, logproto.BACKWARD, data.params.Direction())
require.Equal(t, uint32(100), data.params.Limit())
require.Equal(t, stats.Result{}, *data.statistics)
},
},
{
Expand All @@ -115,18 +114,41 @@ func Test_StatsHTTP(t *testing.T) {
data.statistics = &statsResult
w.WriteHeader(http.StatusTeapot)
}),
func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) {
require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), status)
require.Equal(t, "foo", p.Query())
require.Equal(t, logproto.BACKWARD, p.Direction())
require.Equal(t, uint32(100), p.Limit())
require.Equal(t, statsResult, s)
func(t *testing.T, data *queryData) {
require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), data.status)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, logproto.BACKWARD, data.params.Direction())
require.Equal(t, uint32(100), data.params.Limit())
require.Equal(t, statsResult, *data.statistics)
},
},
{
"result",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.params = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,
})
data.statistics = &statsResult
data.result = streams
w.WriteHeader(http.StatusTeapot)
}),
func(t *testing.T, data *queryData) {
require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), data.status)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, logproto.BACKWARD, data.params.Direction())
require.Equal(t, uint32(100), data.params.Limit())
require.Equal(t, statsResult, *data.statistics)
require.Equal(t, streams, data.result)
},
},
} {
t.Run(test.name, func(t *testing.T) {
statsHTTPMiddleware(metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) {
test.expect(t, ctx, p, status, stats)
statsHTTPMiddleware(metricRecorderFn(func(data *queryData) {
test.expect(t, data)
})).Wrap(test.next).ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "/foo", strings.NewReader("")))
})
}
Expand Down