Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051

Merged
merged 22 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
311769f
[new] promtail pipeline: Promtail Rate Limit stage #5048
liguozhong Jan 5, 2022
ba7f04b
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 6, 2022
bea828a
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 6, 2022
e19b0d7
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 11, 2022
f45bc3b
Update clients/pkg/logentry/stages/limit.go
liguozhong Jan 11, 2022
b9db3ea
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 11, 2022
e1e89ba
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 11, 2022
d843ef5
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 12, 2022
9f688ec
Update docs/sources/clients/promtail/pipelines.md
liguozhong Jan 14, 2022
f5b397c
Update docs/sources/clients/promtail/stages/limit.md
liguozhong Jan 14, 2022
317103f
Update docs/sources/clients/promtail/stages/limit.md
liguozhong Jan 14, 2022
d709e7b
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 14, 2022
cee7dd9
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 14, 2022
e909d2f
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 14, 2022
5d6ba9a
[new] promtail pipeline: Promtail Rate Limit stage #5048 #5051
liguozhong Jan 14, 2022
c07bcff
Update docs/sources/clients/promtail/stages/limit.md
cyriltovena Mar 2, 2022
5112902
Update docs/sources/clients/promtail/stages/limit.md
cyriltovena Mar 2, 2022
5ec6018
Merge branch 'main' into rate_limit_stage
liguozhong Mar 4, 2022
a5aaa4e
fix import package
liguozhong Mar 4, 2022
d034653
Update docs/sources/clients/promtail/stages/limit.md
liguozhong Mar 31, 2022
57a5dbe
Update docs/sources/clients/promtail/stages/limit.md
liguozhong Mar 31, 2022
7b5defb
Update docs/sources/clients/promtail/pipelines.md
liguozhong Mar 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package stages

import (
"context"
"fmt"
"reflect"
"regexp"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

const (
ErrLimitStageEmptyConfig = "limit stage config must contain at least one of `source`, `expression`, `older_than` or `longer_than`"
ErrLimitStageInvalidConfig = "limit stage config error, `value` and `expression` cannot both be defined at the same time."
ErrLimitStageInvalidRegex = "limit stage regex compilation error: %v"
ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst"
)

var ratelimitDropReason = "ratelimit_drop_stage"

type LimitConfig struct {
Source *string `mapstructure:"source"`
Rate float64 `mapstructure:"rate"`
Burst int `mapstructure:"burst"`
Drop bool `mapstructure:"drop"`
Value *string `mapstructure:"value"`
Expression *string `mapstructure:"expression"`
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
regex *regexp.Regexp
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

func newLimitStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &LimitConfig{}

err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateLimitConfig(cfg)
if err != nil {
return nil, err
}

r := &limitStage{
logger: log.With(logger, "component", "stage", "type", "limit"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
rateLimiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Burst),
}
return r, nil
}

func validateLimitConfig(cfg *LimitConfig) error {
if cfg == nil ||
(cfg.Source == nil && cfg.Expression == nil) {
return errors.New(ErrLimitStageEmptyConfig)
}
if cfg.Value != nil && cfg.Expression != nil {
return errors.New(ErrLimitStageInvalidConfig)
}
if cfg.Expression != nil {
expr, err := regexp.Compile(*cfg.Expression)
if err != nil {
return errors.Errorf(ErrLimitStageInvalidRegex, err)
}
cfg.regex = expr
}
if cfg.Rate <= 0 || cfg.Burst <= 0 {
return errors.Errorf(ErrLimitStageInvalidRateOrBurst)
}
return nil
}

// limitStage applies Label matchers to determine if the include stages should be run
type limitStage struct {
logger log.Logger
cfg *LimitConfig
rateLimiter *rate.Limiter
dropCount *prometheus.CounterVec
}

func (m *limitStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if !m.shouldThrottle(e) {
out <- e
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not expect any side-effects from a function named shouldThrottle but only an evaluation whether throttling needs to be applied. What do you think about splitting the check and the actual throttling logic into separate functions?

}
}()
return out
}

func (m *limitStage) shouldThrottle(e Entry) bool {
isMatchTag := m.isMatchTag(e)
if !isMatchTag {
return false
}
if m.cfg.Drop {
if m.rateLimiter.Allow() {
return false
}
m.dropCount.WithLabelValues(ratelimitDropReason).Inc()
return true
}
_ = m.rateLimiter.Wait(context.Background())
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
return false
}

// Name implements Stage
func (m *limitStage) Name() string {
return StageTypeLimit
}

func (m *limitStage) isMatchTag(e Entry) bool {
if m.cfg.Source != nil && m.cfg.Expression == nil {
if v, ok := e.Extracted[*m.cfg.Source]; ok {
if m.cfg.Value == nil {
// Found in map, no value set meaning drop if found in map
if Debug {
level.Debug(m.logger).Log("msg", "line met drop criteria for finding source key in extracted map")
}
} else {
if *m.cfg.Value == v {
// Found in map with value set for drop
if Debug {
level.Debug(m.logger).Log("msg", "line met drop criteria for finding source key in extracted map with value matching desired drop value")
}
} else {
// Value doesn't match, don't drop
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, source key was found in extracted map but value '%v' did not match desired value '%v'", v, *m.cfg.Value))
}
return false
}
}
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Not found in extact map, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided source was not found in the extracted map")
}
return false
}
}

if m.cfg.Expression != nil {
if m.cfg.Source != nil {
if v, ok := e.Extracted[*m.cfg.Source]; ok {
s, err := getString(v)
if err != nil {
if Debug {
level.Debug(m.logger).Log("msg", "Failed to convert extracted map value to string, cannot test regex line will not be dropped.", "err", err, "type", reflect.TypeOf(v))
}
return false
}
match := m.cfg.regex.FindStringSubmatch(s)
if match == nil {
// Not a match to the regex, don't drop
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, the provided regular expression did not match the value found in the extracted map for source key: %v", *m.cfg.Source))
}
return false
}
// regex match, will be dropped
if Debug {
level.Debug(m.logger).Log("msg", "line met drop criteria, regex matched the value in the extracted map source key")
}

} else {
// Not found in extact map, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided source was not found in the extracted map")
}
return false
}
} else {
match := m.cfg.regex.FindStringSubmatch(e.Line)
if match == nil {
// Not a match to the regex, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line")
}
return false
}
if Debug {
level.Debug(m.logger).Log("msg", "line met drop criteria, the provided regular expression matched the log line")
}
}
}

// Everything matched, drop the line
if Debug {
level.Debug(m.logger).Log("msg", "all criteria met, line will be dropped")
}
return true
}
80 changes: 80 additions & 0 deletions clients/pkg/logentry/stages/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package stages

import (
"testing"
"time"

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Not all these are tested but are here to make sure the different types marshal without error
var testLimitWaitYaml = `
pipeline_stages:
- json:
expressions:
app:
msg:
- limit:
source: app
value: loki
rate: 1
burst: 1
drop: false
`

// Not all these are tested but are here to make sure the different types marshal without error
var testLimitDropYaml = `
pipeline_stages:
- json:
expressions:
app:
msg:
- limit:
source: app
value: loki
rate: 1
burst: 1
drop: true
`

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 5
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now()))
}
require.NoError(t, err)
out := processEntries(pl,
logs...,
)
// Only the second line will go through.
assert.Len(t, out, logCount)
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 10
for i := 0; i < logCount; i++ {
logs = append(logs, newEntry(nil, model.LabelSet{"app": "loki"}, testMatchLogLineApp1, time.Now()))
}
require.NoError(t, err)
out := processEntries(pl,
logs...,
)
// Only the second line will go through.
assert.Len(t, out, 1)
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeLimit = "limit"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeLabelAllow = "labelallow"
Expand Down Expand Up @@ -183,6 +184,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeLimit:
s, err = newLimitStage(logger, cfg, registerer)
if err != nil {
return nil, err
}
case StageTypeMultiline:
s, err = newMultilineStage(logger, cfg)
if err != nil {
Expand Down