diff --git a/clients/pkg/logentry/stages/limit.go b/clients/pkg/logentry/stages/limit.go new file mode 100644 index 000000000000..44c48af1d0b1 --- /dev/null +++ b/clients/pkg/logentry/stages/limit.go @@ -0,0 +1,90 @@ +package stages + +import ( + "context" + + "github.com/go-kit/log" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" +) + +const ( + ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst" +) + +var ratelimitDropReason = "ratelimit_drop_stage" + +type LimitConfig struct { + Rate float64 `mapstructure:"rate"` + Burst int `mapstructure:"burst"` + Drop bool `mapstructure:"drop"` +} + +func newLimitStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { + cfg := &LimitConfig{} + + err := mapstructure.WeakDecode(config, cfg) + if err != nil { + return nil, err + } + err = validateLimitConfig(cfg) + if err != nil { + return nil, err + } + + r := &limitStage{ + logger: log.With(logger, "component", "stage", "type", "limit"), + cfg: cfg, + dropCount: getDropCountMetric(registerer), + rateLimiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst), + } + return r, nil +} + +func validateLimitConfig(cfg *LimitConfig) error { + if cfg.Rate <= 0 || cfg.Burst <= 0 { + return errors.Errorf(ErrLimitStageInvalidRateOrBurst) + } + return nil +} + +// limitStage applies Label matchers to determine if the include stages should be run +type limitStage struct { + logger log.Logger + cfg *LimitConfig + rateLimiter *rate.Limiter + dropCount *prometheus.CounterVec +} + +func (m *limitStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + if !m.shouldThrottle() { + out <- e + continue + } + } + }() + return out +} + +func (m *limitStage) shouldThrottle() bool { + if m.cfg.Drop { + if m.rateLimiter.Allow() { + return false + } + m.dropCount.WithLabelValues(ratelimitDropReason).Inc() + return true + } + _ = m.rateLimiter.Wait(context.Background()) + return false +} + +// Name implements Stage +func (m *limitStage) Name() string { + return StageTypeLimit +} diff --git a/clients/pkg/logentry/stages/limit_test.go b/clients/pkg/logentry/stages/limit_test.go new file mode 100644 index 000000000000..baf39b10a1c5 --- /dev/null +++ b/clients/pkg/logentry/stages/limit_test.go @@ -0,0 +1,77 @@ +package stages + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + util_log "github.com/grafana/loki/pkg/util/log" +) + +// Not all these are tested but are here to make sure the different types marshal without error +var testLimitWaitYaml = ` +pipeline_stages: +- json: + expressions: + app: + msg: +- limit: + rate: 1 + burst: 1 + drop: false +` + +// Not all these are tested but are here to make sure the different types marshal without error +var testLimitDropYaml = ` +pipeline_stages: +- json: + expressions: + app: + msg: +- limit: + rate: 1 + burst: 1 + drop: true +` + +// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline +func TestLimitWaitPipeline(t *testing.T) { + registry := prometheus.NewRegistry() + plName := "testPipeline" + pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitYaml), &plName, registry) + logs := make([]Entry, 0) + logCount := 5 + for i := 0; i < logCount; i++ { + logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now())) + } + require.NoError(t, err) + out := processEntries(pl, + logs..., + ) + // Only the second line will go through. + assert.Len(t, out, logCount) + assert.Equal(t, out[0].Line, testMatchLogLineApp1) +} + +// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline +func TestLimitDropPipeline(t *testing.T) { + registry := prometheus.NewRegistry() + plName := "testPipeline" + pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropYaml), &plName, registry) + logs := make([]Entry, 0) + logCount := 10 + for i := 0; i < logCount; i++ { + logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now())) + } + require.NoError(t, err) + out := processEntries(pl, + logs..., + ) + // Only the second line will go through. + assert.Len(t, out, 1) + assert.Equal(t, out[0].Line, testMatchLogLineApp1) +} diff --git a/clients/pkg/logentry/stages/stage.go b/clients/pkg/logentry/stages/stage.go index 66b3feec32b7..1c5ca55f4a13 100644 --- a/clients/pkg/logentry/stages/stage.go +++ b/clients/pkg/logentry/stages/stage.go @@ -31,6 +31,7 @@ const ( StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" + StageTypeLimit = "limit" StageTypeMultiline = "multiline" StageTypePack = "pack" StageTypeLabelAllow = "labelallow" @@ -183,6 +184,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeLimit: + s, err = newLimitStage(logger, cfg, registerer) + if err != nil { + return nil, err + } case StageTypeMultiline: s, err = newMultilineStage(logger, cfg) if err != nil { diff --git a/docs/sources/clients/promtail/pipelines.md b/docs/sources/clients/promtail/pipelines.md index d7d31e9e88aa..3eec9f760d05 100644 --- a/docs/sources/clients/promtail/pipelines.md +++ b/docs/sources/clients/promtail/pipelines.md @@ -221,3 +221,4 @@ Filtering stages: - [match](../stages/match/): Conditionally run stages based on the label set. - [drop](../stages/drop/): Conditionally drop log lines based on several options. + - [limit](../stages/limit/): Conditionally rate limit log lines based on several options. diff --git a/docs/sources/clients/promtail/stages/limit.md b/docs/sources/clients/promtail/stages/limit.md new file mode 100644 index 000000000000..e9028eaa9177 --- /dev/null +++ b/docs/sources/clients/promtail/stages/limit.md @@ -0,0 +1,58 @@ +--- +title: limit +--- +# `limit` stage + +The `limit` stage is a rate-limiting stage that throttles logs based on several options. + +## Limit stage schema + +This pipeline stage places limits on the rate or burst quantity of log lines that Promtail pushes to Loki. +The concept of having distinct burst and rate limits mirrors the approach to limits that can be set for Loki's distributor component: `ingestion_rate_mb` and `ingestion_burst_size_mb`, as defined in [limits_config](../../../../configuration/#limits_config). + +```yaml +limit: + # The rate limit in lines per second that Promtail will push to Loki + [rate: ] + + # The cap in the quantity of burst lines that Promtail will push to Loki + [burst: ] + + # When drop is true, log lines that exceed the current rate limit will be discarded. + # When drop is false, log lines that exceed the current rate limit will only wait + # to enter the back pressure mode. + [drop: | default = false] +``` + +## Examples + +The following are examples showing the use of the `limit` stage. + +### limit + +Simple `limit` stage configurations. + +#### Match a line and throttle + +Given the pipeline: + +```yaml +- limit: + rate: 10 + burst: 10 +``` + +Would throttle any log line. + +#### Match a line and drop + +Given the pipeline: + +```yaml +- limit: + rate: 10 + burst: 10 + drop: true +``` + +Would throttle any log line and drop logs when rate limit.