Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new feature] promtail: Add a Promtail stage for probabilistic sampling #7127

Merged
merged 38 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
dbcf4eb
[new feature] promtail: Add a Promtail stage for probabilistic sampling
liguozhong Sep 10, 2022
e71222c
fix lint
liguozhong Sep 10, 2022
1ffdfcf
Update sampling.go
liguozhong Sep 10, 2022
ed0c2e8
Update sampling.go
liguozhong Sep 10, 2022
d720a63
Update sampling.go
liguozhong Sep 10, 2022
b7a0571
Update sampling.go
liguozhong Sep 10, 2022
f70180e
Update sampling.go
liguozhong Sep 10, 2022
a1ccb59
Update sampling.go
liguozhong Sep 10, 2022
a66f641
Update sampling_test.go
liguozhong Sep 10, 2022
1ccb1cb
Update limit_test.go
liguozhong Sep 10, 2022
0a8510f
Update sampling_test.go
liguozhong Sep 10, 2022
5543c7b
fix lint
liguozhong Sep 10, 2022
0ed01d0
Update sampling_test.go
liguozhong Sep 10, 2022
7bd6dda
delete 'dropStage' word
liguozhong Sep 13, 2022
179296d
Merge branch 'main' into promtail_sampling
liguozhong Sep 13, 2022
d8a0371
remove pool
liguozhong Sep 14, 2022
2315fa9
Merge branch 'main' into promtail_sampling
liguozhong Sep 14, 2022
3a32d06
Merge branch 'main' into promtail_sampling
liguozhong Feb 9, 2023
4a59de6
fix test
liguozhong Feb 9, 2023
29ad535
add sampling doc
liguozhong Feb 9, 2023
0bbcd58
Rename sampling.go to sampling.md
liguozhong Feb 9, 2023
f4ff5ea
Update sampling.md
liguozhong Feb 9, 2023
89d78f6
Update sampling.md
liguozhong Feb 9, 2023
ab20011
Update sampling.md
liguozhong Feb 9, 2023
b2da069
Update sampling.md
liguozhong Feb 9, 2023
47656a2
Update sampling.md
liguozhong Feb 9, 2023
4502df6
Update sampling.md
liguozhong Feb 9, 2023
a91af62
Update sampling.md
liguozhong Feb 9, 2023
0962aed
Update sampling.md
liguozhong Feb 9, 2023
f118389
Update sampling.md
liguozhong Feb 9, 2023
d3cf0ee
Update sampling.md
liguozhong Feb 9, 2023
3bdda1d
Update sampling.md
liguozhong Feb 9, 2023
dbfb86a
Update sampling.md
liguozhong Feb 9, 2023
1c41a4c
Update sampling.md
liguozhong Feb 9, 2023
8fabe08
Update docs/sources/clients/promtail/stages/sampling.md
liguozhong Feb 10, 2023
e6eb899
Update docs/sources/clients/promtail/stages/sampling.md
liguozhong Feb 10, 2023
9be2bf7
Update sampling.md
liguozhong Feb 10, 2023
b099e8d
Update sampling.md
liguozhong Feb 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions clients/pkg/logentry/stages/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ pipeline_stages:
burst: 1
drop: true
`
var plName = "testPipeline"

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 5
Expand All @@ -60,7 +60,6 @@ func TestLimitWaitPipeline(t *testing.T) {
// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "testPipeline"
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropYaml), &plName, registry)
logs := make([]Entry, 0)
logCount := 10
Expand Down
120 changes: 120 additions & 0 deletions clients/pkg/logentry/stages/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package stages

import (
"math"
"math/rand"
"sync"
"time"

"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber/jaeger-client-go/utils"
)

const (
ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f"
)
const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff

var (
defaultSamplingpReason = "sampling_stage"
)

// SamplingConfig contains the configuration for a samplingStage
type SamplingConfig struct {
DropReason *string `mapstructure:"drop_counter_reason"`
//
SamplingRate float64 `mapstructure:"rate"`
}

// validateSamplingConfig validates the SamplingConfig for the sampleStage
func validateSamplingConfig(cfg *SamplingConfig) error {
if cfg.DropReason == nil || *cfg.DropReason == "" {
cfg.DropReason = &defaultSamplingpReason
}
if cfg.SamplingRate < 0.0 || cfg.SamplingRate > 1.0 {
return errors.Errorf(ErrSamplingStageInvalidRate, cfg.SamplingRate)
}

return nil
}

// newSamplingStage creates a SamplingStage from config
// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/tracer.go:126
func newSamplingStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &SamplingConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateSamplingConfig(cfg)
if err != nil {
return nil, err
}

samplingRate := math.Max(0.0, math.Min(cfg.SamplingRate, 1.0))
samplingBoundary := uint64(float64(maxRandomNumber) * samplingRate)
seedGenerator := utils.NewRand(time.Now().UnixNano())
return &samplingStage{
logger: log.With(logger, "component", "stage", "type", "sampling"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
samplingBoundary: samplingBoundary,
pool: sync.Pool{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming the reference implementation is using a sync.Pool for storing rand generators since the generator created by NewSource is not safe for concurrent use. And as an added benefit generating random number from this pool would make use of generators created with different seeds.

But in this stage, we have a single go routine that's generating the random numbers which means we don't necessarily need a sync.Pool maybe we should drop this

New: func() interface{} {
return rand.NewSource(seedGenerator.Int63())
},
},
}, nil
}

type samplingStage struct {
logger log.Logger
cfg *SamplingConfig
dropCount *prometheus.CounterVec
samplingBoundary uint64
pool sync.Pool
}

func (m *samplingStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if m.isSampled() {
out <- e
continue
}
m.dropCount.WithLabelValues(*m.cfg.DropReason).Inc()
}
}()
return out
}

// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/sampler.go:144
// func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag)
func (m *samplingStage) isSampled() bool {
return m.samplingBoundary >= m.randomID()&maxRandomNumber
}
func (m *samplingStage) randomID() uint64 {
val := m.randomNumber()
for val == 0 {
val = m.randomNumber()
}
return val
}
func (m *samplingStage) randomNumber() uint64 {
generator := m.pool.Get().(rand.Source)
number := uint64(generator.Int63())
m.pool.Put(generator)
return number
}

// Name implements Stage
func (m *samplingStage) Name() string {
return StageTypeSampling
}
62 changes: 62 additions & 0 deletions clients/pkg/logentry/stages/sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package stages

import (
"fmt"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util_log "github.com/grafana/loki/pkg/util/log"
)

var testSampingYaml = `
pipeline_stages:
- sampling:
rate: 0.5
`

func TestSamplingPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testSampingYaml), &plName, registry)
require.NoError(t, err)

entries := make([]Entry, 0)
for i := 0; i < 100; i++ {
entries = append(entries, newEntry(nil, nil, testMatchLogLineApp1, time.Now()))
}

out := processEntries(pl, entries...,
)
// sampling rate = 0.5,entries len = 100,
// The theoretical sample size is 50.
// 50>30 and 50<70
assert.GreaterOrEqual(t, len(out), 30)
assert.LessOrEqual(t, len(out), 70)

}

func Test_validateSamplingConfig(t *testing.T) {
tests := []struct {
name string
config *SamplingConfig
wantErr error
}{
{
name: "Invalid rate",
config: &SamplingConfig{
SamplingRate: 12,
},
wantErr: fmt.Errorf(ErrSamplingStageInvalidRate, 12.0),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := validateSamplingConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) {
t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr)
}
})
}
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeLimit = "limit"
StageTypeSampling = "sampling"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeLabelAllow = "labelallow"
Expand Down Expand Up @@ -184,6 +185,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeSampling:
s, err = newSamplingStage(logger, cfg, registerer)
if err != nil {
return nil, err
}
case StageTypeLimit:
s, err = newLimitStage(logger, cfg, registerer)
if err != nil {
Expand Down