-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[new feature] promtail: Add a Promtail stage for probabilistic sampling #7127
Merged
Merged
Changes from 15 commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
dbcf4eb
[new feature] promtail: Add a Promtail stage for probabilistic sampling
liguozhong e71222c
fix lint
liguozhong 1ffdfcf
Update sampling.go
liguozhong ed0c2e8
Update sampling.go
liguozhong d720a63
Update sampling.go
liguozhong b7a0571
Update sampling.go
liguozhong f70180e
Update sampling.go
liguozhong a1ccb59
Update sampling.go
liguozhong a66f641
Update sampling_test.go
liguozhong 1ccb1cb
Update limit_test.go
liguozhong 0a8510f
Update sampling_test.go
liguozhong 5543c7b
fix lint
liguozhong 0ed01d0
Update sampling_test.go
liguozhong 7bd6dda
delete 'dropStage' word
liguozhong 179296d
Merge branch 'main' into promtail_sampling
liguozhong d8a0371
remove pool
liguozhong 2315fa9
Merge branch 'main' into promtail_sampling
liguozhong 3a32d06
Merge branch 'main' into promtail_sampling
liguozhong 4a59de6
fix test
liguozhong 29ad535
add sampling doc
liguozhong 0bbcd58
Rename sampling.go to sampling.md
liguozhong f4ff5ea
Update sampling.md
liguozhong 89d78f6
Update sampling.md
liguozhong ab20011
Update sampling.md
liguozhong b2da069
Update sampling.md
liguozhong 47656a2
Update sampling.md
liguozhong 4502df6
Update sampling.md
liguozhong a91af62
Update sampling.md
liguozhong 0962aed
Update sampling.md
liguozhong f118389
Update sampling.md
liguozhong d3cf0ee
Update sampling.md
liguozhong 3bdda1d
Update sampling.md
liguozhong dbfb86a
Update sampling.md
liguozhong 1c41a4c
Update sampling.md
liguozhong 8fabe08
Update docs/sources/clients/promtail/stages/sampling.md
liguozhong e6eb899
Update docs/sources/clients/promtail/stages/sampling.md
liguozhong 9be2bf7
Update sampling.md
liguozhong b099e8d
Update sampling.md
liguozhong File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
package stages | ||
|
||
import ( | ||
"math" | ||
"math/rand" | ||
"sync" | ||
"time" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/mitchellh/mapstructure" | ||
"github.com/pkg/errors" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/uber/jaeger-client-go/utils" | ||
) | ||
|
||
const ( | ||
ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f" | ||
) | ||
const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff | ||
|
||
var ( | ||
defaultSamplingpReason = "sampling_stage" | ||
) | ||
|
||
// SamplingConfig contains the configuration for a samplingStage | ||
type SamplingConfig struct { | ||
DropReason *string `mapstructure:"drop_counter_reason"` | ||
// | ||
SamplingRate float64 `mapstructure:"rate"` | ||
} | ||
|
||
// validateSamplingConfig validates the SamplingConfig for the sampleStage | ||
func validateSamplingConfig(cfg *SamplingConfig) error { | ||
if cfg.DropReason == nil || *cfg.DropReason == "" { | ||
cfg.DropReason = &defaultSamplingpReason | ||
} | ||
if cfg.SamplingRate < 0.0 || cfg.SamplingRate > 1.0 { | ||
return errors.Errorf(ErrSamplingStageInvalidRate, cfg.SamplingRate) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// newSamplingStage creates a SamplingStage from config | ||
// code from jaeger project. | ||
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/tracer.go:126 | ||
func newSamplingStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { | ||
cfg := &SamplingConfig{} | ||
err := mapstructure.WeakDecode(config, cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
err = validateSamplingConfig(cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
samplingRate := math.Max(0.0, math.Min(cfg.SamplingRate, 1.0)) | ||
samplingBoundary := uint64(float64(maxRandomNumber) * samplingRate) | ||
seedGenerator := utils.NewRand(time.Now().UnixNano()) | ||
return &samplingStage{ | ||
logger: log.With(logger, "component", "stage", "type", "sampling"), | ||
cfg: cfg, | ||
dropCount: getDropCountMetric(registerer), | ||
samplingBoundary: samplingBoundary, | ||
pool: sync.Pool{ | ||
New: func() interface{} { | ||
return rand.NewSource(seedGenerator.Int63()) | ||
}, | ||
}, | ||
}, nil | ||
} | ||
|
||
type samplingStage struct { | ||
logger log.Logger | ||
cfg *SamplingConfig | ||
dropCount *prometheus.CounterVec | ||
samplingBoundary uint64 | ||
pool sync.Pool | ||
} | ||
|
||
func (m *samplingStage) Run(in chan Entry) chan Entry { | ||
out := make(chan Entry) | ||
go func() { | ||
defer close(out) | ||
for e := range in { | ||
if m.isSampled() { | ||
out <- e | ||
continue | ||
} | ||
m.dropCount.WithLabelValues(*m.cfg.DropReason).Inc() | ||
} | ||
}() | ||
return out | ||
} | ||
|
||
// code from jaeger project. | ||
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/sampler.go:144 | ||
// func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag) | ||
func (m *samplingStage) isSampled() bool { | ||
return m.samplingBoundary >= m.randomID()&maxRandomNumber | ||
} | ||
func (m *samplingStage) randomID() uint64 { | ||
val := m.randomNumber() | ||
for val == 0 { | ||
val = m.randomNumber() | ||
} | ||
return val | ||
} | ||
func (m *samplingStage) randomNumber() uint64 { | ||
generator := m.pool.Get().(rand.Source) | ||
number := uint64(generator.Int63()) | ||
m.pool.Put(generator) | ||
return number | ||
} | ||
|
||
// Name implements Stage | ||
func (m *samplingStage) Name() string { | ||
return StageTypeSampling | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package stages | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
util_log "github.com/grafana/loki/pkg/util/log" | ||
) | ||
|
||
var testSampingYaml = ` | ||
pipeline_stages: | ||
- sampling: | ||
rate: 0.5 | ||
` | ||
|
||
func TestSamplingPipeline(t *testing.T) { | ||
registry := prometheus.NewRegistry() | ||
pl, err := NewPipeline(util_log.Logger, loadConfig(testSampingYaml), &plName, registry) | ||
require.NoError(t, err) | ||
|
||
entries := make([]Entry, 0) | ||
for i := 0; i < 100; i++ { | ||
entries = append(entries, newEntry(nil, nil, testMatchLogLineApp1, time.Now())) | ||
} | ||
|
||
out := processEntries(pl, entries..., | ||
) | ||
// sampling rate = 0.5,entries len = 100, | ||
// The theoretical sample size is 50. | ||
// 50>30 and 50<70 | ||
assert.GreaterOrEqual(t, len(out), 30) | ||
assert.LessOrEqual(t, len(out), 70) | ||
|
||
} | ||
|
||
func Test_validateSamplingConfig(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
config *SamplingConfig | ||
wantErr error | ||
}{ | ||
{ | ||
name: "Invalid rate", | ||
config: &SamplingConfig{ | ||
SamplingRate: 12, | ||
}, | ||
wantErr: fmt.Errorf(ErrSamplingStageInvalidRate, 12.0), | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
if err := validateSamplingConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) { | ||
t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr) | ||
} | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am assuming the reference implementation is using a
sync.Pool
for storing rand generators since the generator created byNewSource
is not safe for concurrent use. And as an added benefit generating random number from this pool would make use of generators created with different seeds.But in this stage, we have a single go routine that's generating the random numbers which means we don't necessarily need a
sync.Pool
maybe we should drop this