Skip to content

Commit

Permalink
[new] promtail pipeline: Promtail Rate Limit stage grafana#5048 (graf…
Browse files Browse the repository at this point in the history
…ana#5051) (grafana#5793)

* [new] promtail pipeline:  Promtail Rate Limit stage grafana#5048

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* Update clients/pkg/logentry/stages/limit.go

Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* Update docs/sources/clients/promtail/pipelines.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* Update docs/sources/clients/promtail/stages/limit.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* Update docs/sources/clients/promtail/stages/limit.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* [new] promtail pipeline: Promtail Rate Limit stage grafana#5048 grafana#5051

* Update docs/sources/clients/promtail/stages/limit.md

* Update docs/sources/clients/promtail/stages/limit.md

* fix import package

* Update docs/sources/clients/promtail/stages/limit.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* Update docs/sources/clients/promtail/stages/limit.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* Update docs/sources/clients/promtail/pipelines.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>
Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
(cherry picked from commit 098b5a2)

Co-authored-by: 李国忠 <249032432@qq.com>
  • Loading branch information
2 people authored and splitice committed May 21, 2022
1 parent f0253e6 commit e835c2c
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 0 deletions.
90 changes: 90 additions & 0 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package stages

import (
"context"

"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

const (
ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst"
)

var ratelimitDropReason = "ratelimit_drop_stage"

type LimitConfig struct {
Rate float64 `mapstructure:"rate"`
Burst int `mapstructure:"burst"`
Drop bool `mapstructure:"drop"`
}

func newLimitStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &LimitConfig{}

err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateLimitConfig(cfg)
if err != nil {
return nil, err
}

r := &limitStage{
logger: log.With(logger, "component", "stage", "type", "limit"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
rateLimiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst),
}
return r, nil
}

func validateLimitConfig(cfg *LimitConfig) error {
if cfg.Rate <= 0 || cfg.Burst <= 0 {
return errors.Errorf(ErrLimitStageInvalidRateOrBurst)
}
return nil
}

// limitStage applies Label matchers to determine if the include stages should be run
type limitStage struct {
logger log.Logger
cfg *LimitConfig
rateLimiter *rate.Limiter
dropCount *prometheus.CounterVec
}

func (m *limitStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if !m.shouldThrottle() {
out <- e
continue
}
}
}()
return out
}

func (m *limitStage) shouldThrottle() bool {
if m.cfg.Drop {
if m.rateLimiter.Allow() {
return false
}
m.dropCount.WithLabelValues(ratelimitDropReason).Inc()
return true
}
_ = m.rateLimiter.Wait(context.Background())
return false
}

// Name implements Stage
func (m *limitStage) Name() string {
return StageTypeLimit
}
77 changes: 77 additions & 0 deletions clients/pkg/logentry/stages/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package stages

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util_log "github.com/grafana/loki/pkg/util/log"
)

// Not all these are tested but are here to make sure the different types marshal without error
var testLimitWaitYaml = `
pipeline_stages:
- json:
expressions:
app:
msg:
- limit:
rate: 1
burst: 1
drop: false
`

// Not all these are tested but are here to make sure the different types marshal without error
var testLimitDropYaml = `
pipeline_stages:
- json:
expressions:
app:
msg:
- limit:
rate: 1
burst: 1
drop: true
`

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 5
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now()))
}
require.NoError(t, err)
out := processEntries(pl,
logs...,
)
// Only the second line will go through.
assert.Len(t, out, logCount)
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 10
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now()))
}
require.NoError(t, err)
out := processEntries(pl,
logs...,
)
// Only the second line will go through.
assert.Len(t, out, 1)
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeLimit = "limit"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeLabelAllow = "labelallow"
Expand Down Expand Up @@ -183,6 +184,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeLimit:
s, err = newLimitStage(logger, cfg, registerer)
if err != nil {
return nil, err
}
case StageTypeMultiline:
s, err = newMultilineStage(logger, cfg)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,4 @@ Filtering stages:

- [match](../stages/match/): Conditionally run stages based on the label set.
- [drop](../stages/drop/): Conditionally drop log lines based on several options.
- [limit](../stages/limit/): Conditionally rate limit log lines based on several options.
58 changes: 58 additions & 0 deletions docs/sources/clients/promtail/stages/limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
title: limit
---
# `limit` stage

The `limit` stage is a rate-limiting stage that throttles logs based on several options.

## Limit stage schema

This pipeline stage places limits on the rate or burst quantity of log lines that Promtail pushes to Loki.
The concept of having distinct burst and rate limits mirrors the approach to limits that can be set for Loki's distributor component: `ingestion_rate_mb` and `ingestion_burst_size_mb`, as defined in [limits_config](../../../../configuration/#limits_config).

```yaml
limit:
# The rate limit in lines per second that Promtail will push to Loki
[rate: <int>]

# The cap in the quantity of burst lines that Promtail will push to Loki
[burst: <int>]

# When drop is true, log lines that exceed the current rate limit will be discarded.
# When drop is false, log lines that exceed the current rate limit will only wait
# to enter the back pressure mode.
[drop: <bool> | default = false]
```
## Examples
The following are examples showing the use of the `limit` stage.

### limit

Simple `limit` stage configurations.

#### Match a line and throttle

Given the pipeline:

```yaml
- limit:
rate: 10
burst: 10
```

Would throttle any log line.

#### Match a line and drop

Given the pipeline:

```yaml
- limit:
rate: 10
burst: 10
drop: true
```

Would throttle any log line and drop logs when rate limit.

0 comments on commit e835c2c

Please sign in to comment.