diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 38502dc2caae..ecb9624340d5 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -606,6 +606,10 @@ pattern_ingester: # CLI flag: -pattern-ingester.metric-aggregation.log-push-observations [log_push_observations: | default = false] + # How often to downsample metrics from raw push observations. + # CLI flag: -pattern-ingester.downsample-period + [downsample_period: | default = 10s] + # The index_gateway block configures the Loki index gateway server, responsible # for serving index queries without the need to constantly interact with the # object store. diff --git a/pkg/pattern/flush.go b/pkg/pattern/flush.go index d53b486a168c..e6ae90d20ff5 100644 --- a/pkg/pattern/flush.go +++ b/pkg/pattern/flush.go @@ -27,6 +27,7 @@ func (i *Ingester) Flush() { func (i *Ingester) flush(mayRemoveStreams bool) { i.sweepUsers(true, mayRemoveStreams) + i.downsampleMetrics(model.Now()) // Close the flush queues, to unblock waiting workers. for _, flushQueue := range i.flushQueues { @@ -73,3 +74,20 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) { return true, nil }) } + +func (i *Ingester) downsampleMetrics(ts model.Time) { + instances := i.getInstances() + + for _, instance := range instances { + i.downsampleInstance(instance, ts) + } +} + +func (i *Ingester) downsampleInstance(instance *instance, ts model.Time) { + _ = instance.streams.ForEach(func(s *stream) (bool, error) { + instance.streams.WithLock(func() { + s.Downsample(ts) + }) + return true, nil + }) +} diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 15972fec24ba..a45298228c33 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "google.golang.org/grpc/health/grpc_health_v1" ring_client "github.com/grafana/dskit/ring/client" @@ -206,13 +207,33 @@ func (i *Ingester) loop() { flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j) defer flushTicker.Stop() - for { - select { - case <-flushTicker.C: - i.sweepUsers(false, true) + if i.cfg.MetricAggregation.Enabled { + downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod) + defer downsampleTicker.Stop() - case <-i.loopQuit: - return + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + + case t := <-downsampleTicker.C: + downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod) + now := model.TimeFromUnixNano(t.UnixNano()) + i.downsampleMetrics(now) + + case <-i.loopQuit: + return + } + } + } else { + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + + case <-i.loopQuit: + return + } } } } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index d35d893fdc8a..5c4ac61484e7 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -39,6 +40,16 @@ func setup(t *testing.T) *instance { return inst } +func downsampleInstance(inst *instance, tts int64) { + ts := model.TimeFromUnixNano(time.Unix(tts, 0).UnixNano()) + _ = inst.streams.ForEach(func(s *stream) (bool, error) { + inst.streams.WithLock(func() { + s.Downsample(ts) + }) + return true, nil + }) +} + func TestInstancePushQuery(t *testing.T) { inst := setup(t) err := inst.Push(context.Background(), &push.PushRequest{ @@ -55,6 +66,7 @@ func TestInstancePushQuery(t *testing.T) { }, }) require.NoError(t, err) + downsampleInstance(inst, 20) err = inst.Push(context.Background(), &push.PushRequest{ Streams: []push.Stream{ @@ -70,6 +82,7 @@ func TestInstancePushQuery(t *testing.T) { }, }) require.NoError(t, err) + downsampleInstance(inst, 30) for i := 0; i <= 30; i++ { err = inst.Push(context.Background(), &push.PushRequest{ @@ -87,6 +100,7 @@ func TestInstancePushQuery(t *testing.T) { }) require.NoError(t, err) } + downsampleInstance(inst, 30) it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{ Query: "{test=\"test\"}", @@ -115,6 +129,9 @@ func TestInstancePushQuerySamples(t *testing.T) { }, }, }) + require.NoError(t, err) + downsampleInstance(inst, 0) + for i := 1; i <= 30; i++ { err = inst.Push(context.Background(), &push.PushRequest{ Streams: []push.Stream{ @@ -130,8 +147,8 @@ func TestInstancePushQuerySamples(t *testing.T) { }, }) require.NoError(t, err) + downsampleInstance(inst, int64(20*i)) } - require.NoError(t, err) expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`) require.NoError(t, err) @@ -149,10 +166,11 @@ func TestInstancePushQuerySamples(t *testing.T) { require.Equal(t, lbls.String(), res.Series[0].GetLabels()) - // end - start / step -- (start is 0, step is 10s) + // end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals) expectedDataPoints := ((20 * 30) / 10) require.Equal(t, expectedDataPoints, len(res.Series[0].Samples)) require.Equal(t, float64(1), res.Series[0].Samples[0].Value) + require.Equal(t, float64(1), res.Series[0].Samples[expectedDataPoints-1].Value) expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`) require.NoError(t, err) @@ -170,7 +188,7 @@ func TestInstancePushQuerySamples(t *testing.T) { require.Equal(t, lbls.String(), res.Series[0].GetLabels()) - // end - start / step -- (start is 0, step is 10s) + // end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals) expectedDataPoints = ((20 * 30) / 10) require.Equal(t, expectedDataPoints, len(res.Series[0].Samples)) @@ -187,6 +205,101 @@ func TestInstancePushQuerySamples(t *testing.T) { require.Equal(t, float64(4), res.Series[0].Samples[expectedDataPoints-1].Value) }) + t.Run("test count_over_time samples with downsampling", func(t *testing.T) { + inst := setup(t) + err := inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbls.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(0, 0), + Line: "ts=1 msg=hello", + }, + }, + }, + }, + }) + require.NoError(t, err) + downsampleInstance(inst, 0) + + for i := 1; i <= 30; i++ { + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbls.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(int64(10*i), 0), + Line: "foo bar foo bar", + }, + }, + }, + }, + }) + require.NoError(t, err) + + // downsample every 20s + if i%2 == 0 { + downsampleInstance(inst, int64(10*i)) + } + } + + expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`) + require.NoError(t, err) + + it, err := inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{ + Query: expr.String(), + Start: time.Unix(0, 0), + End: time.Unix(int64(10*30), 0), + Step: 20000, + }) + require.NoError(t, err) + res, err := iter.ReadAllSamples(it) + require.NoError(t, err) + require.Equal(t, 1, len(res.Series)) + + require.Equal(t, lbls.String(), res.Series[0].GetLabels()) + + // end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals) + expectedDataPoints := ((10 * 30) / 20) + require.Equal(t, expectedDataPoints, len(res.Series[0].Samples)) + require.Equal(t, float64(1), res.Series[0].Samples[0].Value) + + // after the first push there's 2 pushes per sample due to downsampling + require.Equal(t, float64(2), res.Series[0].Samples[expectedDataPoints-1].Value) + + expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`) + require.NoError(t, err) + + it, err = inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{ + Query: expr.String(), + Start: time.Unix(0, 0), + End: time.Unix(int64(10*30), 0), + Step: 20000, + }) + require.NoError(t, err) + res, err = iter.ReadAllSamples(it) + require.NoError(t, err) + require.Equal(t, 1, len(res.Series)) + + require.Equal(t, lbls.String(), res.Series[0].GetLabels()) + + // end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals) + expectedDataPoints = ((10 * 30) / 20) + require.Equal(t, expectedDataPoints, len(res.Series[0].Samples)) + + // with a larger selection range of 80s, we expect to eventually get up to 8 per datapoint + // our pushes are spaced 10s apart, downsampled every 20s, and there's 10s step, + // so we expect to see the value increase by 2 every samples, maxing out and staying at 8 after 5 samples + require.Equal(t, float64(1), res.Series[0].Samples[0].Value) + require.Equal(t, float64(3), res.Series[0].Samples[1].Value) + require.Equal(t, float64(5), res.Series[0].Samples[2].Value) + require.Equal(t, float64(7), res.Series[0].Samples[3].Value) + require.Equal(t, float64(8), res.Series[0].Samples[4].Value) + require.Equal(t, float64(8), res.Series[0].Samples[expectedDataPoints-1].Value) + }) + t.Run("test bytes_over_time samples", func(t *testing.T) { inst := setup(t) err := inst.Push(context.Background(), &push.PushRequest{ @@ -202,6 +315,9 @@ func TestInstancePushQuerySamples(t *testing.T) { }, }, }) + require.NoError(t, err) + + downsampleInstance(inst, 0) for i := 1; i <= 30; i++ { err = inst.Push(context.Background(), &push.PushRequest{ Streams: []push.Stream{ @@ -217,8 +333,8 @@ func TestInstancePushQuerySamples(t *testing.T) { }, }) require.NoError(t, err) + downsampleInstance(inst, int64(20*i)) } - require.NoError(t, err) expr, err := syntax.ParseSampleExpr(`bytes_over_time({test="test"}[20s])`) require.NoError(t, err) @@ -343,6 +459,9 @@ func TestInstancePushQuerySamples(t *testing.T) { }, }, }) + require.NoError(t, err) + downsampleInstance(inst, 0) + for i := 1; i <= 30; i++ { err = inst.Push(context.Background(), &push.PushRequest{ Streams: []push.Stream{ @@ -397,8 +516,8 @@ func TestInstancePushQuerySamples(t *testing.T) { }, }) require.NoError(t, err) + downsampleInstance(inst, int64(20*i)) } - require.NoError(t, err) for _, tt := range []struct { name string diff --git a/pkg/pattern/instance_test.go b/pkg/pattern/instance_test.go index 22bfaa53330f..6bc3dd0ed465 100644 --- a/pkg/pattern/instance_test.go +++ b/pkg/pattern/instance_test.go @@ -36,6 +36,15 @@ func TestInstance_QuerySample(t *testing.T) { return instance } + downsampleInstance := func(inst *instance, ts model.Time) { + _ = inst.streams.ForEach(func(s *stream) (bool, error) { + inst.streams.WithLock(func() { + s.Downsample(ts) + }) + return true, nil + }) + } + ctx := context.Background() thirtySeconds := int64(30000) @@ -85,6 +94,7 @@ func TestInstance_QuerySample(t *testing.T) { }, }) require.NoError(t, err) + downsampleInstance(instance, model.Time(lastTsMilli)) // 5 min query range // 1 min step @@ -203,6 +213,7 @@ func TestInstance_QuerySample(t *testing.T) { }, }) require.NoError(t, err) + downsampleInstance(instance, model.Time(then+oneMin+thirtySeconds)) err = instance.Push(ctx, &logproto.PushRequest{ Streams: []push.Stream{ @@ -245,6 +256,7 @@ func TestInstance_QuerySample(t *testing.T) { }, }) require.NoError(t, err) + downsampleInstance(instance, model.Time(then+oneMin+oneMin+oneMin+thirtySeconds)) // steps start := then diff --git a/pkg/pattern/metric/chunk.go b/pkg/pattern/metric/chunk.go index d4dfcb65d061..6694ac0ebe88 100644 --- a/pkg/pattern/metric/chunk.go +++ b/pkg/pattern/metric/chunk.go @@ -34,12 +34,13 @@ type metrics struct { } type Chunks struct { - chunks []*Chunk - labels labels.Labels - service string - metrics metrics - logger log.Logger - lock sync.RWMutex + chunks []*Chunk + labels labels.Labels + lock sync.RWMutex + logger log.Logger + metrics metrics + rawSamples SamplesWithoutTS + service string } func NewChunks(labels labels.Labels, chunkMetrics *ChunkMetrics, logger log.Logger) *Chunks { @@ -55,37 +56,25 @@ func NewChunks(labels labels.Labels, chunkMetrics *ChunkMetrics, logger log.Logg ) return &Chunks{ - chunks: []*Chunk{}, - labels: labels, - service: service, + chunks: []*Chunk{}, + labels: labels, + logger: logger, + rawSamples: SamplesWithoutTS{}, + service: service, + metrics: metrics{ chunks: chunkMetrics.chunks.WithLabelValues(service), samples: chunkMetrics.samples.WithLabelValues(service), }, - logger: logger, } } -func (c *Chunks) Observe(bytes, count float64, ts model.Time) { +func (c *Chunks) Observe(bytes, count float64) { c.lock.Lock() defer c.lock.Unlock() + c.rawSamples = append(c.rawSamples, newSampleWithoutTS(bytes, count)) c.metrics.samples.Inc() - - if len(c.chunks) == 0 { - c.chunks = append(c.chunks, newChunk(bytes, count, ts)) - c.metrics.chunks.Set(float64(len(c.chunks))) - return - } - - last := c.chunks[len(c.chunks)-1] - if !last.spaceFor(ts) { - c.chunks = append(c.chunks, newChunk(bytes, count, ts)) - c.metrics.chunks.Set(float64(len(c.chunks))) - return - } - - last.AddSample(newSample(bytes, count, ts)) } func (c *Chunks) Prune(olderThan time.Duration) bool { @@ -204,6 +193,11 @@ type Sample struct { Count float64 } +type SampleWithoutTS struct { + Bytes float64 + Count float64 +} + func newSample(bytes, count float64, ts model.Time) Sample { return Sample{ Timestamp: ts, @@ -212,7 +206,17 @@ func newSample(bytes, count float64, ts model.Time) Sample { } } -type Samples []Sample +func newSampleWithoutTS(bytes, count float64) SampleWithoutTS { + return SampleWithoutTS{ + Bytes: bytes, + Count: count, + } +} + +type ( + Samples []Sample + SamplesWithoutTS []SampleWithoutTS +) type Chunk struct { Samples Samples @@ -291,3 +295,34 @@ func (c *Chunk) ForTypeAndRange( return aggregatedSamples, nil } + +func (c *Chunks) Downsample(now model.Time) { + c.lock.Lock() + defer func() { + c.lock.Unlock() + c.rawSamples = c.rawSamples[:0] + }() + + var totalBytes, totalCount float64 + for _, sample := range c.rawSamples { + totalBytes += sample.Bytes + totalCount += sample.Count + } + + c.metrics.samples.Inc() + + if len(c.chunks) == 0 { + c.chunks = append(c.chunks, newChunk(totalBytes, totalCount, now)) + c.metrics.chunks.Set(float64(len(c.chunks))) + return + } + + last := c.chunks[len(c.chunks)-1] + if !last.spaceFor(now) { + c.chunks = append(c.chunks, newChunk(totalBytes, totalCount, now)) + c.metrics.chunks.Set(float64(len(c.chunks))) + return + } + + last.AddSample(newSample(totalBytes, totalCount, now)) +} diff --git a/pkg/pattern/metric/chunk_test.go b/pkg/pattern/metric/chunk_test.go index 40701744da9b..0e335a43cfc1 100644 --- a/pkg/pattern/metric/chunk_test.go +++ b/pkg/pattern/metric/chunk_test.go @@ -447,3 +447,31 @@ func Test_Chunks_Iterator(t *testing.T) { require.Equal(t, 4, cap(res.Series[0].Samples)) }) } + +func TestDownsample(t *testing.T) { + // Create a Chunks object with two rawChunks, each containing two Samples + c := NewChunks(labels.Labels{ + labels.Label{Name: "foo", Value: "bar"}, + }, NewChunkMetrics(nil, "test"), log.NewNopLogger()) + + c.Observe(2.0, 1.0) + c.Observe(2.0, 1.0) + c.Observe(2.0, 1.0) + + now := model.Time(5000) + // Call the Downsample function + c.Downsample(now) + + chunks := c.chunks + + require.Len(t, chunks, 1) + + // Check that the result is a Chunk with the correct summed values + result := chunks[0] + require.Len(t, result.Samples, 1) + require.Equal(t, 6.0, result.Samples[0].Bytes) + require.Equal(t, 3.0, result.Samples[0].Count) + require.Equal(t, model.Time(5000), result.Samples[0].Timestamp) + + require.Len(t, c.rawSamples, 0) +} diff --git a/pkg/pattern/metric/config.go b/pkg/pattern/metric/config.go index 7a7737368f0d..0cc318998e4a 100644 --- a/pkg/pattern/metric/config.go +++ b/pkg/pattern/metric/config.go @@ -1,10 +1,14 @@ package metric -import "flag" +import ( + "flag" + "time" +) type AggregationConfig struct { - Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` - LogPushObservations bool `yaml:"log_push_observations,omitempty" doc:"description=Whether to log push observations."` + Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` + LogPushObservations bool `yaml:"log_push_observations,omitempty" doc:"description=Whether to log push observations."` + DownsamplePeriod time.Duration `yaml:"downsample_period"` } // RegisterFlags registers pattern ingester related flags. @@ -13,6 +17,22 @@ func (cfg *AggregationConfig) RegisterFlags(fs *flag.FlagSet) { } func (cfg *AggregationConfig) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) { - fs.BoolVar(&cfg.Enabled, prefix+"metric-aggregation.enabled", false, "Flag to enable or disable metric aggregation.") - fs.BoolVar(&cfg.LogPushObservations, prefix+"metric-aggregation.log-push-observations", false, "Flag to enable or disable logging of push observations.") + fs.BoolVar( + &cfg.Enabled, + prefix+"metric-aggregation.enabled", + false, + "Flag to enable or disable metric aggregation.", + ) + fs.BoolVar( + &cfg.LogPushObservations, + prefix+"metric-aggregation.log-push-observations", + false, + "Flag to enable or disable logging of push observations.", + ) + fs.DurationVar( + &cfg.DownsamplePeriod, + "pattern-ingester.downsample-period", + 10*time.Second, + "How often to downsample metrics from raw push observations.", + ) } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index f6f6f962e0f5..0fe2351d84c1 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -109,7 +109,7 @@ func (s *stream) Push( "sample_ts_ns", s.lastTs, ) } - s.chunks.Observe(bytes, count, model.TimeFromUnixNano(s.lastTs)) + s.chunks.Observe(bytes, count) } return nil } @@ -291,3 +291,12 @@ func (s *stream) prune(olderThan time.Duration) bool { return len(s.patterns.Clusters()) == 0 && chunksPruned } + +func (s *stream) Downsample(ts model.Time) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if s.chunks != nil { + s.chunks.Downsample(ts) + } +} diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index 65d02e8f5fe7..b627d924bed8 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -136,6 +136,7 @@ func TestSampleIterator(t *testing.T) { Line: "ts=2 msg=hello", }, }) + stream.Downsample(model.TimeFromUnix(20)) require.NoError(t, err) @@ -185,6 +186,7 @@ func TestSampleIterator(t *testing.T) { }, }) require.NoError(t, err) + stream.Downsample(model.TimeFromUnix(20)) err = stream.Push(context.Background(), []push.Entry{ { @@ -197,6 +199,7 @@ func TestSampleIterator(t *testing.T) { }, }) require.NoError(t, err) + stream.Downsample(model.TimeFromUnix(40)) t.Run("non-overlapping timestamps", func(t *testing.T) { expr, err := syntax.ParseSampleExpr("count_over_time({foo=\"bar\"}[5s])") @@ -273,6 +276,7 @@ func TestSampleIterator(t *testing.T) { Line: "ts=2 msg=hello", }, }) + stream.Downsample(model.TimeFromUnixNano(time.Unix(26, 999).UnixNano())) require.NoError(t, err)