Skip to content

Commit

Permalink
Re-introduce #3178. (#3233)
Browse files Browse the repository at this point in the history
Lost during a merge/rebase of 6cc41f9 🤷

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jan 26, 2021
1 parent 0208071 commit 567ccde
Showing 1 changed file with 59 additions and 3 deletions.
62 changes: 59 additions & 3 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,35 @@ import (
"math"
"net/http"

"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/util"
"github.com/dustin/go-humanize"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/unmarshal"
unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy"
lokiutil "github.com/grafana/loki/pkg/util"
)

var contentType = http.CanonicalHeaderKey("Content-Type")
var (
contentType = http.CanonicalHeaderKey("Content-Type")

bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
}, []string{"tenant"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})
)

const applicationJSON = "application/json"

Expand All @@ -41,8 +59,46 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
}

func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
userID, _ := user.ExtractOrgID(r.Context())
logger := util.WithContext(r.Context(), util.Logger)
body := lokiutil.NewSizeReader(r.Body)
contentType := r.Header.Get(contentType)
var req logproto.PushRequest

defer func() {
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
)

for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
}
}

// incrementing tenant metrics if we have a tenant.
if totalEntries != 0 && userID != "" {
bytesIngested.WithLabelValues(userID).Add(float64(entriesSize))
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}

level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"contentType", contentType,
"bodySize", humanize.Bytes(uint64(body.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
)
}()

switch r.Header.Get(contentType) {
case applicationJSON:
var err error
Expand Down

0 comments on commit 567ccde

Please sign in to comment.