Skip to content

Commit

Permalink
[new feature] promtail: Add a Promtail stage for probabilistic sampli…
Browse files Browse the repository at this point in the history
…ng (#7127)

<!--  Thanks for sending a pull request!  Before submitting:

1. Read our CONTRIBUTING.md guide
2. Name your PR as `<Feature Area>: Describe your change`.
a. Do not end the title with punctuation. It will be added in the
changelog.
b. Start with an imperative verb. Example: Fix the latency between
System A and System B.
  c. Use sentence case, not title case.
d. Use a complete phrase or sentence. The PR title will appear in a
changelog, so help other people understand what your change will be.
3. Rebase your PR if it gets out of sync with main
-->

**What this PR does / why we need it**:

The sampling stage can be directly sampled.
The implementation of sampling is to use the algorithm in jaeger go
client
```
pipeline_stages:
- sampling:
     rate: 0.1
```
or it can be used with match for precise sampling.
```
pipeline_stages:
- json:
    expressions:
      app:
- match:
    pipeline_name: "app2"
    selector: "{app=\"poki\"}"
    stages:
    - sampling:
        rate: 0.1
```

**Which issue(s) this PR fixes**:
Fixes #6654

**Special notes for your reviewer**:

The promtail 'rate' stage is also used with the 'match' stage for log
filtering.This design makes the code very clean.
Other log agents vector have log filtering built into the sampling
operator, which I think is too complicated
https://vector.dev/docs/reference/configuration/transforms/sample/
```
 transforms:
  my_transform_id:
    type: sample
    inputs:
      - my-source-or-transform-id
    exclude: null
    rate: 10
```

'rate' stage review suggestions .
#5051

![image](https://user-images.githubusercontent.com/9583245/189461481-6ee4d835-2573-4b8e-8dec-2814620d758a.png)

<!--
Note about CHANGELOG entries, if a change adds:
* an important feature
* fixes an issue present in a previous release, 
* causes a change in operation that would be useful for an operator of
Loki to know
then please add a CHANGELOG entry.

For documentation changes, build changes, simple fixes etc please skip
this step. We are attempting to curate a changelog of the most relevant
and important changes to be easier to ingest by end users of Loki.

Note about the upgrade guide, if this changes:
* default configuration values
* metric names or label names
* changes existing log lines such as the metrics.go query output line
* configuration parameters 
* anything to do with any API
* any other change that would require special attention or extra steps
to upgrade
Please document clearly what changed AND what needs to be done in the
upgrade guide.
-->
**Checklist**
- [x] Documentation added
- [x] Tests updated
- [ ] Is this an important fix or new feature? Add an entry in the
`CHANGELOG.md`.
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

---------

Co-authored-by: J Stickler <julie.stickler@grafana.com>
  • Loading branch information
liguozhong and JStickler authored Mar 17, 2023
1 parent 53b9a35 commit 0295fd4
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
113 changes: 113 additions & 0 deletions clients/pkg/logentry/stages/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package stages

import (
"math"
"math/rand"
"time"

"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber/jaeger-client-go/utils"
)

const (
ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f"
)
const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff

var (
defaultSamplingpReason = "sampling_stage"
)

// SamplingConfig contains the configuration for a samplingStage
type SamplingConfig struct {
DropReason *string `mapstructure:"drop_counter_reason"`
//
SamplingRate float64 `mapstructure:"rate"`
}

// validateSamplingConfig validates the SamplingConfig for the sampleStage
func validateSamplingConfig(cfg *SamplingConfig) error {
if cfg.DropReason == nil || *cfg.DropReason == "" {
cfg.DropReason = &defaultSamplingpReason
}
if cfg.SamplingRate < 0.0 || cfg.SamplingRate > 1.0 {
return errors.Errorf(ErrSamplingStageInvalidRate, cfg.SamplingRate)
}

return nil
}

// newSamplingStage creates a SamplingStage from config
// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/tracer.go:126
func newSamplingStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &SamplingConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateSamplingConfig(cfg)
if err != nil {
return nil, err
}

samplingRate := math.Max(0.0, math.Min(cfg.SamplingRate, 1.0))
samplingBoundary := uint64(float64(maxRandomNumber) * samplingRate)
seedGenerator := utils.NewRand(time.Now().UnixNano())
source := rand.NewSource(seedGenerator.Int63())
return &samplingStage{
logger: log.With(logger, "component", "stage", "type", "sampling"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
samplingBoundary: samplingBoundary,
source: source,
}, nil
}

type samplingStage struct {
logger log.Logger
cfg *SamplingConfig
dropCount *prometheus.CounterVec
samplingBoundary uint64
source rand.Source
}

func (m *samplingStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if m.isSampled() {
out <- e
continue
}
m.dropCount.WithLabelValues(*m.cfg.DropReason).Inc()
}
}()
return out
}

// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/sampler.go:144
// func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag)
func (m *samplingStage) isSampled() bool {
return m.samplingBoundary >= m.randomID()&maxRandomNumber
}
func (m *samplingStage) randomID() uint64 {
val := m.randomNumber()
for val == 0 {
val = m.randomNumber()
}
return val
}
func (m *samplingStage) randomNumber() uint64 {
return uint64(m.source.Int63())
}

// Name implements Stage
func (m *samplingStage) Name() string {
return StageTypeSampling
}
62 changes: 62 additions & 0 deletions clients/pkg/logentry/stages/sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package stages

import (
"fmt"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util_log "github.com/grafana/loki/pkg/util/log"
)

var testSampingYaml = `
pipeline_stages:
- sampling:
rate: 0.5
`

func TestSamplingPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testSampingYaml), &plName, registry)
require.NoError(t, err)

entries := make([]Entry, 0)
for i := 0; i < 100; i++ {
entries = append(entries, newEntry(nil, nil, testMatchLogLineApp1, time.Now()))
}

out := processEntries(pl, entries...,
)
// sampling rate = 0.5,entries len = 100,
// The theoretical sample size is 50.
// 50>30 and 50<70
assert.GreaterOrEqual(t, len(out), 30)
assert.LessOrEqual(t, len(out), 70)

}

func Test_validateSamplingConfig(t *testing.T) {
tests := []struct {
name string
config *SamplingConfig
wantErr error
}{
{
name: "Invalid rate",
config: &SamplingConfig{
SamplingRate: 12,
},
wantErr: fmt.Errorf(ErrSamplingStageInvalidRate, 12.0),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := validateSamplingConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) {
t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr)
}
})
}
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeSampling = "sampling"
StageTypeLimit = "limit"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
Expand Down Expand Up @@ -187,6 +188,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeSampling:
s, err = newSamplingStage(logger, cfg, registerer)
if err != nil {
return nil, err
}
case StageTypeLimit:
s, err = newLimitStage(logger, cfg, registerer)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/stages/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Action stages:
- [labelallow]({{<relref "labelallow">}}): Allow label set for the log entry.
- [labels]({{<relref "labels">}}): Update the label set for the log entry.
- [limit]({{<relref "limit">}}): Limit the rate lines will be sent to Loki.
- [sampling]({{<relref "sampling">}}): Sampling the lines will be sent to Loki.
- [static_labels]({{<relref "static_labels">}}): Add static-labels to the log entry.
- [metrics]({{<relref "metrics">}}): Calculate metrics based on extracted data.
- [tenant]({{<relref "tenant">}}): Set the tenant ID value to use for the log entry.
Expand Down
51 changes: 51 additions & 0 deletions docs/sources/clients/promtail/stages/sampling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
title: sampling
description: sampling stage
---
# sampling

The `sampling` stage is a stage that sampling logs.

## Sampling stage schema

The `sampling` stage is used to sampling the logs. Configuring the value `rate: 0.1` means that 10% of the logs will be pushed to the Loki server.

```yaml
sampling:
# The rate sampling in lines per second that Promtail will push to Loki.The value is between 0 and 1, where a value of 0 means no logs are sampled and a value of 1 means 100% of logs are sampled.
[rate: <int>]
```
## Examples
The following are examples showing the use of the `sampling` stage.

### sampling

#### Simple sampling

Given the pipeline:

```yaml
- sampling:
rate: 0.1
```

#### Match a line and sampling

Given the pipeline:

```yaml
pipeline_stages:
- json:
expressions:
app:
- match:
pipeline_name: "app2"
selector: "{app="poki"}"
stages:
- sampling:
rate: 0.1
```
Complex `sampling` stage configurations.

0 comments on commit 0295fd4

Please sign in to comment.