Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new feature] promtail: Add a Promtail stage for probabilistic sampling #7127

Merged
merged 38 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
dbcf4eb
[new feature] promtail: Add a Promtail stage for probabilistic sampling
liguozhong Sep 10, 2022
e71222c
fix lint
liguozhong Sep 10, 2022
1ffdfcf
Update sampling.go
liguozhong Sep 10, 2022
ed0c2e8
Update sampling.go
liguozhong Sep 10, 2022
d720a63
Update sampling.go
liguozhong Sep 10, 2022
b7a0571
Update sampling.go
liguozhong Sep 10, 2022
f70180e
Update sampling.go
liguozhong Sep 10, 2022
a1ccb59
Update sampling.go
liguozhong Sep 10, 2022
a66f641
Update sampling_test.go
liguozhong Sep 10, 2022
1ccb1cb
Update limit_test.go
liguozhong Sep 10, 2022
0a8510f
Update sampling_test.go
liguozhong Sep 10, 2022
5543c7b
fix lint
liguozhong Sep 10, 2022
0ed01d0
Update sampling_test.go
liguozhong Sep 10, 2022
7bd6dda
delete 'dropStage' word
liguozhong Sep 13, 2022
179296d
Merge branch 'main' into promtail_sampling
liguozhong Sep 13, 2022
d8a0371
remove pool
liguozhong Sep 14, 2022
2315fa9
Merge branch 'main' into promtail_sampling
liguozhong Sep 14, 2022
3a32d06
Merge branch 'main' into promtail_sampling
liguozhong Feb 9, 2023
4a59de6
fix test
liguozhong Feb 9, 2023
29ad535
add sampling doc
liguozhong Feb 9, 2023
0bbcd58
Rename sampling.go to sampling.md
liguozhong Feb 9, 2023
f4ff5ea
Update sampling.md
liguozhong Feb 9, 2023
89d78f6
Update sampling.md
liguozhong Feb 9, 2023
ab20011
Update sampling.md
liguozhong Feb 9, 2023
b2da069
Update sampling.md
liguozhong Feb 9, 2023
47656a2
Update sampling.md
liguozhong Feb 9, 2023
4502df6
Update sampling.md
liguozhong Feb 9, 2023
a91af62
Update sampling.md
liguozhong Feb 9, 2023
0962aed
Update sampling.md
liguozhong Feb 9, 2023
f118389
Update sampling.md
liguozhong Feb 9, 2023
d3cf0ee
Update sampling.md
liguozhong Feb 9, 2023
3bdda1d
Update sampling.md
liguozhong Feb 9, 2023
dbfb86a
Update sampling.md
liguozhong Feb 9, 2023
1c41a4c
Update sampling.md
liguozhong Feb 9, 2023
8fabe08
Update docs/sources/clients/promtail/stages/sampling.md
liguozhong Feb 10, 2023
e6eb899
Update docs/sources/clients/promtail/stages/sampling.md
liguozhong Feb 10, 2023
9be2bf7
Update sampling.md
liguozhong Feb 10, 2023
b099e8d
Update sampling.md
liguozhong Feb 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions clients/pkg/logentry/stages/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package stages

import (
"math"
"math/rand"
"time"

"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber/jaeger-client-go/utils"
)

const (
ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f"
)
const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff

var (
defaultSamplingpReason = "sampling_stage"
)

// SamplingConfig contains the configuration for a samplingStage
type SamplingConfig struct {
DropReason *string `mapstructure:"drop_counter_reason"`
//
SamplingRate float64 `mapstructure:"rate"`
}

// validateSamplingConfig validates the SamplingConfig for the sampleStage
func validateSamplingConfig(cfg *SamplingConfig) error {
if cfg.DropReason == nil || *cfg.DropReason == "" {
cfg.DropReason = &defaultSamplingpReason
}
if cfg.SamplingRate < 0.0 || cfg.SamplingRate > 1.0 {
return errors.Errorf(ErrSamplingStageInvalidRate, cfg.SamplingRate)
}

return nil
}

// newSamplingStage creates a SamplingStage from config
// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/tracer.go:126
func newSamplingStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &SamplingConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateSamplingConfig(cfg)
if err != nil {
return nil, err
}

samplingRate := math.Max(0.0, math.Min(cfg.SamplingRate, 1.0))
samplingBoundary := uint64(float64(maxRandomNumber) * samplingRate)
seedGenerator := utils.NewRand(time.Now().UnixNano())
source := rand.NewSource(seedGenerator.Int63())
return &samplingStage{
logger: log.With(logger, "component", "stage", "type", "sampling"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
samplingBoundary: samplingBoundary,
source: source,
}, nil
}

type samplingStage struct {
logger log.Logger
cfg *SamplingConfig
dropCount *prometheus.CounterVec
samplingBoundary uint64
source rand.Source
}

func (m *samplingStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if m.isSampled() {
out <- e
continue
}
m.dropCount.WithLabelValues(*m.cfg.DropReason).Inc()
}
}()
return out
}

// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/sampler.go:144
// func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag)
func (m *samplingStage) isSampled() bool {
return m.samplingBoundary >= m.randomID()&maxRandomNumber
}
func (m *samplingStage) randomID() uint64 {
val := m.randomNumber()
for val == 0 {
val = m.randomNumber()
}
return val
}
func (m *samplingStage) randomNumber() uint64 {
return uint64(m.source.Int63())
}

// Name implements Stage
func (m *samplingStage) Name() string {
return StageTypeSampling
}
62 changes: 62 additions & 0 deletions clients/pkg/logentry/stages/sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package stages

import (
"fmt"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util_log "github.com/grafana/loki/pkg/util/log"
)

var testSampingYaml = `
pipeline_stages:
- sampling:
rate: 0.5
`

func TestSamplingPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testSampingYaml), &plName, registry)
require.NoError(t, err)

entries := make([]Entry, 0)
for i := 0; i < 100; i++ {
entries = append(entries, newEntry(nil, nil, testMatchLogLineApp1, time.Now()))
}

out := processEntries(pl, entries...,
)
// sampling rate = 0.5,entries len = 100,
// The theoretical sample size is 50.
// 50>30 and 50<70
assert.GreaterOrEqual(t, len(out), 30)
assert.LessOrEqual(t, len(out), 70)

}

func Test_validateSamplingConfig(t *testing.T) {
tests := []struct {
name string
config *SamplingConfig
wantErr error
}{
{
name: "Invalid rate",
config: &SamplingConfig{
SamplingRate: 12,
},
wantErr: fmt.Errorf(ErrSamplingStageInvalidRate, 12.0),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := validateSamplingConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) {
t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr)
}
})
}
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeSampling = "sampling"
StageTypeLimit = "limit"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
Expand Down Expand Up @@ -186,6 +187,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeSampling:
s, err = newSamplingStage(logger, cfg, registerer)
if err != nil {
return nil, err
}
case StageTypeLimit:
s, err = newLimitStage(logger, cfg, registerer)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/stages/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Action stages:
- [labelallow]({{<relref "labelallow">}}): Allow label set for the log entry.
- [labels]({{<relref "labels">}}): Update the label set for the log entry.
- [limit]({{<relref "limit">}}): Limit the rate lines will be sent to Loki.
- [sampling]({{<relref "sampling">}}): Sampling the lines will be sent to Loki.
- [static_labels]({{<relref "static_labels">}}): Add static-labels to the log entry.
- [metrics]({{<relref "metrics">}}): Calculate metrics based on extracted data.
- [tenant]({{<relref "tenant">}}): Set the tenant ID value to use for the log entry.
Expand Down
34 changes: 34 additions & 0 deletions docs/sources/clients/promtail/stages/sampling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
title: sampling
description: sampling stage
---
# sampling

The `sampling` stage is a stage that sampling logs.

## Sampling stage schema

The `sampling` stage is used to sampling the logs.rate: 0.1 means that 10% of the logs can be pushed to the loki server.
liguozhong marked this conversation as resolved.
Show resolved Hide resolved

```yaml
sampling:
# The rate sampling in lines per second that Promtail will push to Loki.The value is between 0 and 1.
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
[rate: <int>]
```

## Examples

The following are examples showing the use of the `sampling` stage.

### sampling

Simple `sampling` stage configurations.

#### Match a line and sampling
JStickler marked this conversation as resolved.
Show resolved Hide resolved

Given the pipeline:

```yaml
- sampling:
rate: 0.1
```