diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index e6174faffb6c..a3d6c4ef67a0 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/clientpool" "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/grafana/loki/v3/pkg/util" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -38,6 +39,7 @@ type Config struct { ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + MetricAggregation metric.AggregationConfig `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."` // For testing. factory ring_client.PoolFactory `yaml:"-"` } @@ -49,6 +51,8 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.") fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") + + cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.") } func (cfg *Config) Validate() error { @@ -335,7 +339,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstance(instanceID, i.logger, i.metrics) + inst, err = newInstance(instanceID, i.logger, i.metrics, i.cfg.MetricAggregation) if err != nil { return nil, err } diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index 41fce2d12398..fc62c7cfdca4 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -45,34 +45,70 @@ func NewIngesterQuerier( } func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { - // validate that a supported query was provided - var expr syntax.Expr _, err := syntax.ParseMatchers(req.Query, true) if err != nil { - expr, err = syntax.ParseSampleExpr(req.Query) - if err != nil { - return nil, ErrParseQuery + // not a pattern query, so either a metric query or an error + if q.cfg.MetricAggregation.Enabled { + return q.queryMetricSamples(ctx, req) } - var selector syntax.LogSelectorExpr - switch expr.(type) { - case *syntax.VectorAggregationExpr: - selector, err = expr.(*syntax.VectorAggregationExpr).Selector() - case *syntax.RangeAggregationExpr: - selector, err = expr.(*syntax.RangeAggregationExpr).Selector() - default: - return nil, ErrParseQuery - } + return nil, err + } - if err != nil { - return nil, err - } + return q.queryPatternSamples(ctx, req) +} - if selector == nil || selector.HasFilter() { - return nil, ErrParseQuery - } +func (q *IngesterQuerier) queryPatternSamples(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { + iterators, err := q.query(ctx, req) + if err != nil { + return nil, err + } + + // TODO(kolesnikovae): Incorporate with pruning + resp, err := iter.ReadPatternsBatch(iter.NewMerge(iterators...), math.MaxInt32) + if err != nil { + return nil, err + } + return prunePatterns(resp, minClusterSize), nil +} + +func (q *IngesterQuerier) queryMetricSamples(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { + expr, err := syntax.ParseSampleExpr(req.Query) + if err != nil { + return nil, err + } + + var selector syntax.LogSelectorExpr + switch expr.(type) { + case *syntax.VectorAggregationExpr: + selector, err = expr.(*syntax.VectorAggregationExpr).Selector() + case *syntax.RangeAggregationExpr: + selector, err = expr.(*syntax.RangeAggregationExpr).Selector() + default: + return nil, ErrParseQuery } + if err != nil { + return nil, err + } + + if selector == nil || selector.HasFilter() { + return nil, ErrParseQuery + } + + iterators, err := q.query(ctx, req) + if err != nil { + return nil, err + } + + resp, err := iter.ReadMetricsBatch(iter.NewMerge(iterators...), math.MaxInt32) + if err != nil { + return nil, err + } + return resp, nil +} + +func (q *IngesterQuerier) query(ctx context.Context, req *logproto.QueryPatternsRequest) ([]iter.Iterator, error) { resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.PatternClient) (interface{}, error) { return client.Query(ctx, req) }) @@ -83,21 +119,7 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte for i := range resps { iterators[i] = iter.NewQueryClientIterator(resps[i].response.(logproto.Pattern_QueryClient)) } - switch expr.(type) { - case *syntax.VectorAggregationExpr, *syntax.RangeAggregationExpr: - resp, err := iter.ReadMetricsBatch(iter.NewMerge(iterators...), math.MaxInt32) - if err != nil { - return nil, err - } - return resp, nil - default: - // TODO(kolesnikovae): Incorporate with pruning - resp, err := iter.ReadPatternsBatch(iter.NewMerge(iterators...), math.MaxInt32) - if err != nil { - return nil, err - } - return prunePatterns(resp, minClusterSize), nil - } + return iterators, nil } func prunePatterns( diff --git a/pkg/pattern/ingester_querier_test.go b/pkg/pattern/ingester_querier_test.go index 809059638401..20b3dcae83d7 100644 --- a/pkg/pattern/ingester_querier_test.go +++ b/pkg/pattern/ingester_querier_test.go @@ -14,6 +14,7 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/pattern/metric" ) func Test_prunePatterns(t *testing.T) { @@ -49,7 +50,11 @@ func Test_prunePatterns(t *testing.T) { func Test_Patterns(t *testing.T) { t.Run("it rejects metric queries with filters", func(t *testing.T) { q := &IngesterQuerier{ - cfg: Config{}, + cfg: Config{ + MetricAggregation: metric.AggregationConfig{ + Enabled: true, + }, + }, logger: log.NewNopLogger(), ringClient: &fakeRingClient{}, registerer: nil, @@ -57,11 +62,11 @@ func Test_Patterns(t *testing.T) { for _, query := range []string{ `count_over_time({foo="bar"} |= "baz" [5m])`, `count_over_time({foo="bar"} != "baz" [5m])`, - `count_over_time({foo="bar"} =~ "baz" [5m])`, + `count_over_time({foo="bar"} |~ "baz" [5m])`, `count_over_time({foo="bar"} !~ "baz" [5m])`, - `count_over_time({foo="bar"} | logfmt | color=blue [5m])`, + `count_over_time({foo="bar"} | logfmt | color="blue" [5m])`, `sum(count_over_time({foo="bar"} |= "baz" [5m]))`, - `sum by label(count_over_time({foo="bar"} |= "baz" [5m]))`, + `sum by (label)(count_over_time({foo="bar"} |= "baz" [5m]))`, `bytes_over_time({foo="bar"} |= "baz" [5m])`, } { _, err := q.Patterns( @@ -71,14 +76,18 @@ func Test_Patterns(t *testing.T) { }, ) require.Error(t, err, query) - require.ErrorIs(t, err, ErrParseQuery) + require.ErrorIs(t, err, ErrParseQuery, query) } }) t.Run("accepts log selector queries and count and bytes metric queries", func(t *testing.T) { q := &IngesterQuerier{ - cfg: Config{}, + cfg: Config{ + MetricAggregation: metric.AggregationConfig{ + Enabled: true, + }, + }, logger: log.NewNopLogger(), ringClient: &fakeRingClient{}, registerer: nil, diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index 6d0ca3485678..9707837a4fab 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -13,17 +13,24 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/grafana/loki/pkg/push" ) func TestInstancePushQuery(t *testing.T) { - t.Run("test pattern samples", func(t *testing.T) { - lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test")) + lbs := labels.New(labels.Label{Name: "test", Value: "test"}) + setup := func() *instance { + inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"), metric.AggregationConfig{ + Enabled: true, + }) require.NoError(t, err) - err = inst.Push(context.Background(), &push.PushRequest{ + return inst + } + t.Run("test pattern samples", func(t *testing.T) { + inst := setup() + err := inst.Push(context.Background(), &push.PushRequest{ Streams: []push.Stream{ { Labels: lbs.String(), @@ -92,11 +99,8 @@ func TestInstancePushQuery(t *testing.T) { }) t.Run("test count_over_time samples", func(t *testing.T) { - lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - inst, err := newInstance("foo", log.NewNopLogger(), nil) - require.NoError(t, err) - - err = inst.Push(context.Background(), &push.PushRequest{ + inst := setup() + err := inst.Push(context.Background(), &push.PushRequest{ Streams: []push.Stream{ { Labels: lbs.String(), @@ -184,11 +188,8 @@ func TestInstancePushQuery(t *testing.T) { }) t.Run("test bytes_over_time samples", func(t *testing.T) { - lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - inst, err := newInstance("foo", log.NewNopLogger(), nil) - require.NoError(t, err) - - err = inst.Push(context.Background(), &push.PushRequest{ + inst := setup() + err := inst.Push(context.Background(), &push.PushRequest{ Streams: []push.Stream{ { Labels: lbs.String(), diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index cafc8dec2354..d98b412a85e2 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -25,27 +25,29 @@ const indexShards = 32 // instance is a tenant instance of the pattern ingester. type instance struct { - instanceID string - buf []byte // buffer used to compute fps. - mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free - streams *streamsMap - index *index.BitPrefixInvertedIndex - logger log.Logger - metrics *ingesterMetrics + instanceID string + buf []byte // buffer used to compute fps. + mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free + streams *streamsMap + index *index.BitPrefixInvertedIndex + logger log.Logger + metrics *ingesterMetrics + aggregationCfg metric.AggregationConfig } -func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics) (*instance, error) { +func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, aggCfg metric.AggregationConfig) (*instance, error) { index, err := index.NewBitPrefixWithShards(indexShards) if err != nil { return nil, err } i := &instance{ - buf: make([]byte, 0, 1024), - logger: logger, - instanceID: instanceID, - streams: newStreamsMap(), - index: index, - metrics: metrics, + buf: make([]byte, 0, 1024), + logger: logger, + instanceID: instanceID, + streams: newStreamsMap(), + index: index, + metrics: metrics, + aggregationCfg: aggCfg, } i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint) return i, nil @@ -60,7 +62,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, func() (*stream, error) { // add stream - return i.createStream(ctx, reqStream) + return i.createStream(ctx, reqStream, i.aggregationCfg.Enabled) }, nil) if err != nil { appendErr.Add(err) @@ -107,6 +109,11 @@ func (i *instance) QuerySample( expr syntax.SampleExpr, req *logproto.QueryPatternsRequest, ) (iter.Iterator, error) { + if !i.aggregationCfg.Enabled { + // Should never get here, but this will prevent nil pointer panics in test + return iter.Empty, nil + } + from, through := util.RoundToMilliseconds(req.Start, req.End) step := model.Time(req.Step) if step < chunk.TimeResolution { @@ -184,14 +191,14 @@ outer: return nil } -func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream) (*stream, error) { +func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream, aggregateMetrics bool) (*stream, error) { labels, err := syntax.ParseLabels(pushReqStream.Labels) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) - s, err := newStream(fp, sortedLabels, i.metrics) + s, err := newStream(fp, sortedLabels, i.metrics, i.aggregationCfg.Enabled) if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } diff --git a/pkg/pattern/instance_test.go b/pkg/pattern/instance_test.go index e40d512f969a..1a3699148920 100644 --- a/pkg/pattern/instance_test.go +++ b/pkg/pattern/instance_test.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,7 +29,7 @@ func TestInstance_QuerySample(t *testing.T) { Step: oneMin, } - instance, err := newInstance("test", log.NewNopLogger(), nil) + instance, err := newInstance("test", log.NewNopLogger(), nil, metric.AggregationConfig{}) require.NoError(t, err) labels := model.LabelSet{ @@ -37,7 +38,7 @@ func TestInstance_QuerySample(t *testing.T) { lastTsMilli := (then + oneMin + oneMin) // 1715964095000 - //TODO(twhitney): Add a few more pushes to this or another test + // TODO(twhitney): Add a few more pushes to this or another test instance.Push(ctx, &logproto.PushRequest{ Streams: []push.Stream{ { diff --git a/pkg/pattern/metric/config.go b/pkg/pattern/metric/config.go new file mode 100644 index 000000000000..8bdf85cdcb64 --- /dev/null +++ b/pkg/pattern/metric/config.go @@ -0,0 +1,16 @@ +package metric + +import "flag" + +type AggregationConfig struct { + Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` +} + +// RegisterFlags registers pattern ingester related flags. +func (cfg *AggregationConfig) RegisterFlags(fs *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix(fs, "") +} + +func (cfg *AggregationConfig) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) { + fs.BoolVar(&cfg.Enabled, prefix+"metric-aggregation.enabled", false, "Flag to enable or disable metric aggregation.") +} diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 24c9e21c96f8..0c08790dd708 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -26,7 +26,9 @@ type stream struct { labelHash uint64 patterns *drain.Drain mtx sync.Mutex - metrics *metric.Chunks + + aggregateMetrics bool + metrics *metric.Chunks evaluator metric.SampleEvaluatorFactory @@ -37,9 +39,9 @@ func newStream( fp model.Fingerprint, labels labels.Labels, metrics *ingesterMetrics, + aggregateMetrics bool, ) (*stream, error) { - chunks := metric.NewChunks(labels) - return &stream{ + stream := &stream{ fp: fp, labels: labels, labelsString: labels.String(), @@ -48,9 +50,16 @@ func newStream( PatternsEvictedTotal: metrics.patternsDiscardedTotal, PatternsDetectedTotal: metrics.patternsDetectedTotal, }), - metrics: chunks, - evaluator: metric.NewDefaultEvaluatorFactory(chunks), - }, nil + aggregateMetrics: aggregateMetrics, + } + + if aggregateMetrics { + chunks := metric.NewChunks(labels) + stream.metrics = chunks + stream.evaluator = metric.NewDefaultEvaluatorFactory(chunks) + } + + return stream, nil } func (s *stream) Push( @@ -73,7 +82,9 @@ func (s *stream) Push( s.patterns.Train(entry.Line, entry.Timestamp.UnixNano()) } - s.metrics.Observe(bytes, count, model.TimeFromUnixNano(s.lastTs)) + if s.aggregateMetrics && s.metrics != nil { + s.metrics.Observe(bytes, count, model.TimeFromUnixNano(s.lastTs)) + } return nil } diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index ab0564af9995..7f6f98d15b21 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -16,7 +16,7 @@ import ( func TestAddStream(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test")) + stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), false) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{ @@ -44,7 +44,12 @@ func TestAddStream(t *testing.T) { func TestPruneStream(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test")) + stream, err := newStream( + model.Fingerprint(lbs.Hash()), + lbs, + newIngesterMetrics(nil, "test"), + false, + ) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{ diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index bfe023b90b56..b05d9411faaf 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -34,7 +34,6 @@ import ( logql_log "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" - "github.com/grafana/loki/v3/pkg/pattern" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage" @@ -1049,9 +1048,7 @@ func (q *SingleTenantQuerier) Patterns(ctx context.Context, req *logproto.QueryP } res, err := q.patternQuerier.Patterns(ctx, req) if err != nil { - if errors.Is(err, pattern.ErrParseQuery) { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } return res, err