From 8f87f7bfbe397c22a97fef70ab135d10e6068b2f Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 15 Jan 2021 12:21:36 +0100 Subject: [PATCH] Logs PushRequest data. This will allows to find information about received size and total entries per tenant. Example of a log from my dev testing: ``` level=debug ts=2021-01-15T11:16:21.735663076Z caller=http.go:67 org_id=3927 traceID=11c4774c6ec4bbf4 msg="push request parsed" path=/loki/api/v1/push content-type=application/x-protobuf body-size="11 kB" streams=5 entries=298 streamLabelsSize="1.9 kB" entriesSize="45 kB" totalSize="47 kB" ``` Of course this means we can use LogQL on this. Signed-off-by: Cyril Tovena --- pkg/distributor/http.go | 43 ++++++++++++++++++++++++++++++++++++----- pkg/util/reader.go | 31 +++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 pkg/util/reader.go diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index b25e3961bf0f..54d5296d0f8e 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -4,6 +4,8 @@ import ( "math" "net/http" + "github.com/dustin/go-humanize" + "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/util" @@ -12,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/unmarshal" unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy" + lokiutil "github.com/grafana/loki/pkg/util" ) var contentType = http.CanonicalHeaderKey("Content-Type") @@ -20,7 +23,6 @@ const applicationJSON = "application/json" // PushHandler reads a snappy-compressed proto from the HTTP body. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { - req, err := ParseRequest(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -44,14 +46,45 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { var req logproto.PushRequest - switch r.Header.Get(contentType) { + logger := util.WithContext(r.Context(), util.Logger) + body := lokiutil.NewSizeReader(r.Body) + contentType := r.Header.Get(contentType) + + defer func() { + var ( + entriesSize int64 + streamLabelsSize int64 + totalEntries int64 + ) + + for _, s := range req.Streams { + streamLabelsSize += int64(len(s.Labels)) + for _, e := range s.Entries { + totalEntries++ + entriesSize += int64(len(e.Line)) + } + } + level.Debug(logger).Log( + "msg", "push request parsed", + "path", r.URL.Path, + "content-type", contentType, + "body-size", humanize.Bytes(uint64(body.Size())), + "streams", len(req.Streams), + "entries", totalEntries, + "streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)), + "entriesSize", humanize.Bytes(uint64(entriesSize)), + "totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)), + ) + }() + + switch contentType { case applicationJSON: var err error if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 { - err = unmarshal.DecodePushRequest(r.Body, &req) + err = unmarshal.DecodePushRequest(body, &req) } else { - err = unmarshal_legacy.DecodePushRequest(r.Body, &req) + err = unmarshal_legacy.DecodePushRequest(body, &req) } if err != nil { @@ -59,7 +92,7 @@ func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { } default: - if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { + if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { return nil, err } } diff --git a/pkg/util/reader.go b/pkg/util/reader.go new file mode 100644 index 000000000000..47a5764143bd --- /dev/null +++ b/pkg/util/reader.go @@ -0,0 +1,31 @@ +package util + +import ( + "io" +) + +type sizeReader struct { + size int64 + r io.Reader +} + +type SizeReader interface { + io.Reader + Size() int64 +} + +// NewSizeReader returns an io.Reader that will have the number of bytes +// read from r available. +func NewSizeReader(r io.Reader) SizeReader { + return &sizeReader{r: r} +} + +func (v *sizeReader) Read(p []byte) (int, error) { + n, err := v.r.Read(p) + v.size += int64(n) + return n, err +} + +func (v *sizeReader) Size() int64 { + return v.size +}