diff --git a/clients/pkg/logentry/stages/sampling.go b/clients/pkg/logentry/stages/sampling.go new file mode 100644 index 000000000000..73d4b2540296 --- /dev/null +++ b/clients/pkg/logentry/stages/sampling.go @@ -0,0 +1,113 @@ +package stages + +import ( + "math" + "math/rand" + "time" + + "github.com/go-kit/log" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/uber/jaeger-client-go/utils" +) + +const ( + ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f" +) +const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff + +var ( + defaultSamplingpReason = "sampling_stage" +) + +// SamplingConfig contains the configuration for a samplingStage +type SamplingConfig struct { + DropReason *string `mapstructure:"drop_counter_reason"` + // + SamplingRate float64 `mapstructure:"rate"` +} + +// validateSamplingConfig validates the SamplingConfig for the sampleStage +func validateSamplingConfig(cfg *SamplingConfig) error { + if cfg.DropReason == nil || *cfg.DropReason == "" { + cfg.DropReason = &defaultSamplingpReason + } + if cfg.SamplingRate < 0.0 || cfg.SamplingRate > 1.0 { + return errors.Errorf(ErrSamplingStageInvalidRate, cfg.SamplingRate) + } + + return nil +} + +// newSamplingStage creates a SamplingStage from config +// code from jaeger project. +// github.com/uber/jaeger-client-go@v2.30.0+incompatible/tracer.go:126 +func newSamplingStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { + cfg := &SamplingConfig{} + err := mapstructure.WeakDecode(config, cfg) + if err != nil { + return nil, err + } + err = validateSamplingConfig(cfg) + if err != nil { + return nil, err + } + + samplingRate := math.Max(0.0, math.Min(cfg.SamplingRate, 1.0)) + samplingBoundary := uint64(float64(maxRandomNumber) * samplingRate) + seedGenerator := utils.NewRand(time.Now().UnixNano()) + source := rand.NewSource(seedGenerator.Int63()) + return &samplingStage{ + logger: log.With(logger, "component", "stage", "type", "sampling"), + cfg: cfg, + dropCount: getDropCountMetric(registerer), + samplingBoundary: samplingBoundary, + source: source, + }, nil +} + +type samplingStage struct { + logger log.Logger + cfg *SamplingConfig + dropCount *prometheus.CounterVec + samplingBoundary uint64 + source rand.Source +} + +func (m *samplingStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + if m.isSampled() { + out <- e + continue + } + m.dropCount.WithLabelValues(*m.cfg.DropReason).Inc() + } + }() + return out +} + +// code from jaeger project. +// github.com/uber/jaeger-client-go@v2.30.0+incompatible/sampler.go:144 +// func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag) +func (m *samplingStage) isSampled() bool { + return m.samplingBoundary >= m.randomID()&maxRandomNumber +} +func (m *samplingStage) randomID() uint64 { + val := m.randomNumber() + for val == 0 { + val = m.randomNumber() + } + return val +} +func (m *samplingStage) randomNumber() uint64 { + return uint64(m.source.Int63()) +} + +// Name implements Stage +func (m *samplingStage) Name() string { + return StageTypeSampling +} diff --git a/clients/pkg/logentry/stages/sampling_test.go b/clients/pkg/logentry/stages/sampling_test.go new file mode 100644 index 000000000000..171277e961d6 --- /dev/null +++ b/clients/pkg/logentry/stages/sampling_test.go @@ -0,0 +1,62 @@ +package stages + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + util_log "github.com/grafana/loki/pkg/util/log" +) + +var testSampingYaml = ` +pipeline_stages: +- sampling: + rate: 0.5 +` + +func TestSamplingPipeline(t *testing.T) { + registry := prometheus.NewRegistry() + pl, err := NewPipeline(util_log.Logger, loadConfig(testSampingYaml), &plName, registry) + require.NoError(t, err) + + entries := make([]Entry, 0) + for i := 0; i < 100; i++ { + entries = append(entries, newEntry(nil, nil, testMatchLogLineApp1, time.Now())) + } + + out := processEntries(pl, entries..., + ) + // sampling rate = 0.5,entries len = 100, + // The theoretical sample size is 50. + // 50>30 and 50<70 + assert.GreaterOrEqual(t, len(out), 30) + assert.LessOrEqual(t, len(out), 70) + +} + +func Test_validateSamplingConfig(t *testing.T) { + tests := []struct { + name string + config *SamplingConfig + wantErr error + }{ + { + name: "Invalid rate", + config: &SamplingConfig{ + SamplingRate: 12, + }, + wantErr: fmt.Errorf(ErrSamplingStageInvalidRate, 12.0), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := validateSamplingConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) { + t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr) + } + }) + } +} diff --git a/clients/pkg/logentry/stages/stage.go b/clients/pkg/logentry/stages/stage.go index da0e9dadca17..3802f9d37ecd 100644 --- a/clients/pkg/logentry/stages/stage.go +++ b/clients/pkg/logentry/stages/stage.go @@ -31,6 +31,7 @@ const ( StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" + StageTypeSampling = "sampling" StageTypeLimit = "limit" StageTypeMultiline = "multiline" StageTypePack = "pack" @@ -187,6 +188,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeSampling: + s, err = newSamplingStage(logger, cfg, registerer) + if err != nil { + return nil, err + } case StageTypeLimit: s, err = newLimitStage(logger, cfg, registerer) if err != nil { diff --git a/docs/sources/clients/promtail/stages/_index.md b/docs/sources/clients/promtail/stages/_index.md index 03b9ef19b5dd..e646b91d230d 100644 --- a/docs/sources/clients/promtail/stages/_index.md +++ b/docs/sources/clients/promtail/stages/_index.md @@ -32,6 +32,7 @@ Action stages: - [labelallow]({{}}): Allow label set for the log entry. - [labels]({{}}): Update the label set for the log entry. - [limit]({{}}): Limit the rate lines will be sent to Loki. + - [sampling]({{}}): Sampling the lines will be sent to Loki. - [static_labels]({{}}): Add static-labels to the log entry. - [metrics]({{}}): Calculate metrics based on extracted data. - [tenant]({{}}): Set the tenant ID value to use for the log entry. diff --git a/docs/sources/clients/promtail/stages/sampling.md b/docs/sources/clients/promtail/stages/sampling.md new file mode 100644 index 000000000000..f0bc30413728 --- /dev/null +++ b/docs/sources/clients/promtail/stages/sampling.md @@ -0,0 +1,51 @@ +--- +title: sampling +description: sampling stage +--- +# sampling + +The `sampling` stage is a stage that sampling logs. + +## Sampling stage schema + +The `sampling` stage is used to sampling the logs. Configuring the value `rate: 0.1` means that 10% of the logs will be pushed to the Loki server. + +```yaml +sampling: + # The rate sampling in lines per second that Promtail will push to Loki.The value is between 0 and 1, where a value of 0 means no logs are sampled and a value of 1 means 100% of logs are sampled. + [rate: ] +``` + +## Examples + +The following are examples showing the use of the `sampling` stage. + +### sampling + +#### Simple sampling + +Given the pipeline: + +```yaml +- sampling: + rate: 0.1 +``` + +#### Match a line and sampling + +Given the pipeline: + +```yaml +pipeline_stages: +- json: + expressions: + app: +- match: + pipeline_name: "app2" + selector: "{app="poki"}" + stages: + - sampling: + rate: 0.1 +``` +Complex `sampling` stage configurations. +