Skip to content

Commit

Permalink
use a global variable to wrap debug log statments in the pipeline to …
Browse files Browse the repository at this point in the history
…prevent unnecessary allocations when debug logging is not enabled
  • Loading branch information
slim-bean committed Sep 6, 2019
1 parent 2064f91 commit 3237b73
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 36 deletions.
7 changes: 7 additions & 0 deletions cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/weaveworks/common/logging"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail"
"github.com/grafana/loki/pkg/promtail/config"
)
Expand Down Expand Up @@ -46,6 +47,12 @@ func main() {
}
util.InitLogger(&config.ServerConfig.Config)

// Set the global debug variable in the stages package which is used to conditionally log
// debug messages which otherwise cause huge allocations processing log lines for log messages never printed
if config.ServerConfig.Config.LogLevel.String() == "debug" {
stages.Debug = true
}

p, err := promtail.New(config)
if err != nil {
level.Error(util.Logger).Log("msg", "error creating promtail", "error", err)
Expand Down
24 changes: 18 additions & 6 deletions pkg/logentry/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,35 +101,45 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac

if j.cfg.Source != nil {
if _, ok := extracted[*j.cfg.Source]; !ok {
level.Debug(j.logger).Log("msg", "source does not exist in the set of extracted values", "source", *j.cfg.Source)
if Debug {
level.Debug(j.logger).Log("msg", "source does not exist in the set of extracted values", "source", *j.cfg.Source)
}
return
}

value, err := getString(extracted[*j.cfg.Source])
if err != nil {
level.Debug(j.logger).Log("msg", "failed to convert source value to string", "source", *j.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*j.cfg.Source]).String())
if Debug {
level.Debug(j.logger).Log("msg", "failed to convert source value to string", "source", *j.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*j.cfg.Source]).String())
}
return
}

input = &value
}

if input == nil {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
if Debug {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
}
return
}

var data map[string]interface{}

if err := json.Unmarshal([]byte(*input), &data); err != nil {
level.Debug(j.logger).Log("msg", "failed to unmarshal log line", "err", err)
if Debug {
level.Debug(j.logger).Log("msg", "failed to unmarshal log line", "err", err)
}
return
}

for n, e := range j.expressions {
r, err := e.Search(data)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to search JMES expression", "err", err)
if Debug {
level.Debug(j.logger).Log("msg", "failed to search JMES expression", "err", err)
}
continue
}

Expand All @@ -145,7 +155,9 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac
// If the value wasn't a string or a number, marshal it back to json
jm, err := json.Marshal(r)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to marshal complex type back to string", "err", err)
if Debug {
level.Debug(j.logger).Log("msg", "failed to marshal complex type back to string", "err", err)
}
continue
}
extracted[n] = string(jm)
Expand Down
8 changes: 6 additions & 2 deletions pkg/logentry/stages/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interfa
lValue := extracted[*lSrc]
s, err := getString(lValue)
if err != nil {
level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue).String())
if Debug {
level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue).String())
}
continue
}
labelValue := model.LabelValue(s)
if !labelValue.IsValid() {
level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", labelValue)
if Debug {
level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", labelValue)
}
continue
}
labels[model.LabelName(lName)] = labelValue
Expand Down
44 changes: 30 additions & 14 deletions pkg/logentry/stages/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,11 @@ func (m *metricStage) recordCounter(name string, counter *metric.Counters, label
if counter.Cfg.Value != nil {
stringVal, err := getString(v)
if err != nil {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to string, "+
"can't perform value comparison", "metric", name, "err",
fmt.Sprintf("can't convert %v to string", reflect.TypeOf(v).String()))
if Debug {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to string, "+
"can't perform value comparison", "metric", name, "err",
fmt.Sprintf("can't convert %v to string", reflect.TypeOf(v).String()))
}
return
}
if *counter.Cfg.Value != stringVal {
Expand All @@ -157,7 +159,9 @@ func (m *metricStage) recordCounter(name string, counter *metric.Counters, label
case metric.CounterAdd:
f, err := getFloat(v)
if err != nil || f < 0 {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to positive float", "metric", name, "err", err)
if Debug {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to positive float", "metric", name, "err", err)
}
return
}
counter.With(labels).Add(f)
Expand All @@ -170,9 +174,11 @@ func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels mode
if gauge.Cfg.Value != nil {
stringVal, err := getString(v)
if err != nil {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to string, "+
"can't perform value comparison", "metric", name, "err",
fmt.Sprintf("can't convert %v to string", reflect.TypeOf(v).String()))
if Debug {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to string, "+
"can't perform value comparison", "metric", name, "err",
fmt.Sprintf("can't convert %v to string", reflect.TypeOf(v).String()))
}
return
}
if *gauge.Cfg.Value != stringVal {
Expand All @@ -184,7 +190,9 @@ func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels mode
case metric.GaugeSet:
f, err := getFloat(v)
if err != nil || f < 0 {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to positive float", "metric", name, "err", err)
if Debug {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to positive float", "metric", name, "err", err)
}
return
}
gauge.With(labels).Set(f)
Expand All @@ -195,14 +203,18 @@ func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels mode
case metric.GaugeAdd:
f, err := getFloat(v)
if err != nil || f < 0 {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to positive float", "metric", name, "err", err)
if Debug {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to positive float", "metric", name, "err", err)
}
return
}
gauge.With(labels).Add(f)
case metric.GaugeSub:
f, err := getFloat(v)
if err != nil || f < 0 {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to positive float", "metric", name, "err", err)
if Debug {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to positive float", "metric", name, "err", err)
}
return
}
gauge.With(labels).Sub(f)
Expand All @@ -215,9 +227,11 @@ func (m *metricStage) recordHistogram(name string, histogram *metric.Histograms,
if histogram.Cfg.Value != nil {
stringVal, err := getString(v)
if err != nil {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to string, "+
"can't perform value comparison", "metric", name, "err",
fmt.Sprintf("can't convert %v to string", reflect.TypeOf(v).String()))
if Debug {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to string, "+
"can't perform value comparison", "metric", name, "err",
fmt.Sprintf("can't convert %v to string", reflect.TypeOf(v).String()))
}
return
}
if *histogram.Cfg.Value != stringVal {
Expand All @@ -226,7 +240,9 @@ func (m *metricStage) recordHistogram(name string, histogram *metric.Histograms,
}
f, err := getFloat(v)
if err != nil {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to float", "metric", name, "err", err)
if Debug {
level.Debug(m.logger).Log("msg", "failed to convert extracted value to float", "metric", name, "err", err)
}
return
}
histogram.With(labels).Observe(f)
Expand Down
8 changes: 6 additions & 2 deletions pkg/logentry/stages/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,16 @@ func (o *outputStage) Process(labels model.LabelSet, extracted map[string]interf
if v, ok := extracted[o.cfgs.Source]; ok {
s, err := getString(v)
if err != nil {
level.Debug(o.logger).Log("msg", "extracted output could not be converted to a string", "err", err, "type", reflect.TypeOf(v).String())
if Debug {
level.Debug(o.logger).Log("msg", "extracted output could not be converted to a string", "err", err, "type", reflect.TypeOf(v).String())
}
return
}
*entry = s
} else {
level.Debug(o.logger).Log("msg", "extracted data did not contain output source")
if Debug {
level.Debug(o.logger).Log("msg", "extracted data did not contain output source")
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/logentry/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,15 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, regist
func (p *Pipeline) Process(labels model.LabelSet, extracted map[string]interface{}, ts *time.Time, entry *string) {
start := time.Now()
for i, stage := range p.stages {
level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "name", stage.Name(), "labels", labels, "time", ts, "entry", entry)
if Debug {
level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "name", stage.Name(), "labels", labels, "time", ts, "entry", entry)
}
stage.Process(labels, extracted, ts, entry)
}
dur := time.Since(start).Seconds()
level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_s", dur)
if Debug {
level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_s", dur)
}
if p.jobName != nil {
p.plDuration.WithLabelValues(*p.jobName).Observe(dur)
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/logentry/stages/regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,35 @@ func (r *regexStage) Process(labels model.LabelSet, extracted map[string]interfa

if r.cfg.Source != nil {
if _, ok := extracted[*r.cfg.Source]; !ok {
level.Debug(r.logger).Log("msg", "source does not exist in the set of extracted values", "source", *r.cfg.Source)
if Debug {
level.Debug(r.logger).Log("msg", "source does not exist in the set of extracted values", "source", *r.cfg.Source)
}
return
}

value, err := getString(extracted[*r.cfg.Source])
if err != nil {
level.Debug(r.logger).Log("msg", "failed to convert source value to string", "source", *r.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*r.cfg.Source]).String())
if Debug {
level.Debug(r.logger).Log("msg", "failed to convert source value to string", "source", *r.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*r.cfg.Source]).String())
}
return
}

input = &value
}

if input == nil {
level.Debug(r.logger).Log("msg", "cannot parse a nil entry")
if Debug {
level.Debug(r.logger).Log("msg", "cannot parse a nil entry")
}
return
}

match := r.expression.FindStringSubmatch(*input)
if match == nil {
level.Debug(r.logger).Log("msg", "regex did not match")
if Debug {
level.Debug(r.logger).Log("msg", "regex did not match")
}
return
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/logentry/stages/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,18 @@ func (o *templateStage) Process(labels model.LabelSet, extracted map[string]inte
if v, ok := extracted[o.cfgs.Source]; ok {
s, err := getString(v)
if err != nil {
level.Debug(o.logger).Log("msg", "extracted template could not be converted to a string", "err", err, "type", reflect.TypeOf(v).String())
if Debug {
level.Debug(o.logger).Log("msg", "extracted template could not be converted to a string", "err", err, "type", reflect.TypeOf(v).String())
}
return
}
td := templateData{s}
buf := &bytes.Buffer{}
err = o.template.Execute(buf, td)
if err != nil {
level.Debug(o.logger).Log("msg", "failed to execute template on extracted value", "err", err, "value", v)
if Debug {
level.Debug(o.logger).Log("msg", "failed to execute template on extracted value", "err", err, "value", v)
}
return
}
st := buf.String()
Expand All @@ -113,7 +117,9 @@ func (o *templateStage) Process(labels model.LabelSet, extracted map[string]inte
buf := &bytes.Buffer{}
err := o.template.Execute(buf, td)
if err != nil {
level.Debug(o.logger).Log("msg", "failed to execute template on extracted value", "err", err, "value", v)
if Debug {
level.Debug(o.logger).Log("msg", "failed to execute template on extracted value", "err", err, "value", v)
}
return
}
st := buf.String()
Expand Down
12 changes: 9 additions & 3 deletions pkg/logentry/stages/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,23 @@ func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]in
if v, ok := extracted[ts.cfgs.Source]; ok {
s, err := getString(v)
if err != nil {
level.Debug(ts.logger).Log("msg", "failed to convert extracted time to string", "err", err, "type", reflect.TypeOf(v).String())
if Debug {
level.Debug(ts.logger).Log("msg", "failed to convert extracted time to string", "err", err, "type", reflect.TypeOf(v).String())
}
}

parsedTs, err := ts.parser(s)
if err != nil {
level.Debug(ts.logger).Log("msg", "failed to parse time", "err", err, "format", ts.cfgs.Format, "value", s)
if Debug {
level.Debug(ts.logger).Log("msg", "failed to parse time", "err", err, "format", ts.cfgs.Format, "value", s)
}
} else {
*t = parsedTs
}
} else {
level.Debug(ts.logger).Log("msg", "extracted data did not contain a timestamp")
if Debug {
level.Debug(ts.logger).Log("msg", "extracted data did not contain a timestamp")
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/logentry/stages/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import (
"time"
)

var (
// Debug is used to wrap debug log statements, the go-kit logger won't let us introspect the current log level
// so this global is used for that purpose. This allows us to skip allocations of log messages at the
// debug level when debug level logging is not enabled. Log level allocations can become very expensive
// as we log numerous log entries per log line at debug level.
Debug = false
)

const (
ErrTimestampContainsYear = "timestamp '%s' is expected to not contain the year date component"
)
Expand Down

0 comments on commit 3237b73

Please sign in to comment.