Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051

Merged
merged 22 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
311769f
[new] promtail pipeline: Promtail Rate Limit stage #5048
liguozhong Jan 5, 2022
ba7f04b
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 6, 2022
bea828a
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 6, 2022
e19b0d7
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 11, 2022
f45bc3b
Update clients/pkg/logentry/stages/limit.go
liguozhong Jan 11, 2022
b9db3ea
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 11, 2022
e1e89ba
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 11, 2022
d843ef5
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 12, 2022
9f688ec
Update docs/sources/clients/promtail/pipelines.md
liguozhong Jan 14, 2022
f5b397c
Update docs/sources/clients/promtail/stages/limit.md
liguozhong Jan 14, 2022
317103f
Update docs/sources/clients/promtail/stages/limit.md
liguozhong Jan 14, 2022
d709e7b
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 14, 2022
cee7dd9
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 14, 2022
e909d2f
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 14, 2022
5d6ba9a
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 14, 2022
c07bcff
Update docs/sources/clients/promtail/stages/limit.md
cyriltovena Mar 2, 2022
5112902
Update docs/sources/clients/promtail/stages/limit.md
cyriltovena Mar 2, 2022
5ec6018
Merge branch 'main' into rate_limit_stage
liguozhong Mar 4, 2022
a5aaa4e
fix import package
liguozhong Mar 4, 2022
d034653
Update docs/sources/clients/promtail/stages/limit.md
liguozhong Mar 31, 2022
57a5dbe
Update docs/sources/clients/promtail/stages/limit.md
liguozhong Mar 31, 2022
7b5defb
Update docs/sources/clients/promtail/pipelines.md
liguozhong Mar 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package stages

import (
"context"

"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

const (
ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst"
)

var ratelimitDropReason = "ratelimit_drop_stage"

type LimitConfig struct {
Rate float64 `mapstructure:"rate"`
Burst int `mapstructure:"burst"`
Drop bool `mapstructure:"drop"`
}

func newLimitStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &LimitConfig{}

err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateLimitConfig(cfg)
if err != nil {
return nil, err
}

r := &limitStage{
logger: log.With(logger, "component", "stage", "type", "limit"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
rateLimiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst),
}
return r, nil
}

func validateLimitConfig(cfg *LimitConfig) error {
if cfg.Rate <= 0 || cfg.Burst <= 0 {
return errors.Errorf(ErrLimitStageInvalidRateOrBurst)
}
return nil
}

// limitStage applies Label matchers to determine if the include stages should be run
type limitStage struct {
logger log.Logger
cfg *LimitConfig
rateLimiter *rate.Limiter
dropCount *prometheus.CounterVec
}

func (m *limitStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if !m.shouldThrottle() {
out <- e
continue
}
}
}()
return out
}

func (m *limitStage) shouldThrottle() bool {
if m.cfg.Drop {
if m.rateLimiter.Allow() {
return false
}
m.dropCount.WithLabelValues(ratelimitDropReason).Inc()
return true
}
_ = m.rateLimiter.Wait(context.Background())
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
return false
}

// Name implements Stage
func (m *limitStage) Name() string {
return StageTypeLimit
}
77 changes: 77 additions & 0 deletions clients/pkg/logentry/stages/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package stages

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util_log "github.com/grafana/loki/pkg/util/log"
)

// Not all these are tested but are here to make sure the different types marshal without error
var testLimitWaitYaml = `
pipeline_stages:
- json:
expressions:
app:
msg:
- limit:
rate: 1
burst: 1
drop: false
`

// Not all these are tested but are here to make sure the different types marshal without error
var testLimitDropYaml = `
pipeline_stages:
- json:
expressions:
app:
msg:
- limit:
rate: 1
burst: 1
drop: true
`

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 5
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now()))
}
require.NoError(t, err)
out := processEntries(pl,
logs...,
)
// Only the second line will go through.
assert.Len(t, out, logCount)
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 10
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now()))
}
require.NoError(t, err)
out := processEntries(pl,
logs...,
)
// Only the second line will go through.
assert.Len(t, out, 1)
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeLimit = "limit"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeLabelAllow = "labelallow"
Expand Down Expand Up @@ -183,6 +184,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeLimit:
s, err = newLimitStage(logger, cfg, registerer)
if err != nil {
return nil, err
}
case StageTypeMultiline:
s, err = newMultilineStage(logger, cfg)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,4 @@ Filtering stages:

- [match](../stages/match/): Conditionally run stages based on the label set.
- [drop](../stages/drop/): Conditionally drop log lines based on several options.
- [limit](../stages/limit/): Conditionally rate limit log lines based on several options.
58 changes: 58 additions & 0 deletions docs/sources/clients/promtail/stages/limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
title: limit
---
# `limit` stage

The `limit` stage is a rate-limiting stage that throttles logs based on several options.

## Limit stage schema

This pipeline stage places limits on the rate or burst quantity of log lines that Promtail pushes to Loki.
The concept of having distinct burst and rate limits mirrors the approach to limits that can be set for Loki's distributor component: `ingestion_rate_mb` and `ingestion_burst_size_mb`, as defined in [limits_config](../../../../configuration/#limits_config).

```yaml
limit:
# The rate limit in lines per second that Promtail will push to Loki
[rate: <int>]

# The cap in the quantity of burst lines that Promtail will push to Loki
[burst: <int>]

# When drop is true, log lines that exceed the current rate limit will be discarded.
# When drop is false, log lines that exceed the current rate limit will only wait
# to enter the back pressure mode.
[drop: <bool> | default = false]
```

## Examples

The following are examples showing the use of the `limit` stage.

### limit

Simple `limit` stage configurations.

#### Match a line and throttle

Given the pipeline:

```yaml
- limit:
rate: 10
burst: 10
```

Would throttle any log line.

#### Match a line and drop

Given the pipeline:

```yaml
- limit:
rate: 10
burst: 10
drop: true
```

Would throttle any log line and drop logs when rate limit.