From 716a7aff53ee8f3d8fe8a9a50697c6fa6a465d28 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 11 Jul 2024 08:59:19 -0700 Subject: [PATCH] fix: backport #13485 to k210 (#13492) --- pkg/logql/engine.go | 3 +- pkg/logql/engine_test.go | 47 +++++++++++++++++++++++++------ pkg/logql/first_last_over_time.go | 27 ++++-------------- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index f35a1b397a3b..0a26520b673c 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -378,11 +378,10 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ return nil, fmt.Errorf("unsupported result type: %T", r) } } - return nil, nil + return nil, errors.New("unexpected empty result") } func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { - seriesIndex := map[uint64]*promql.Series{} vec := promql.Vector{} diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 274276a02c8f..f2c4a2b4f0dd 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -65,15 +65,17 @@ func TestEngine_LogsRateUnwrap(t *testing.T) { {newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)}, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{ - Start: time.Unix(30, 0), - End: time.Unix(60, 0), - Selector: `rate({app="foo"} | unwrap foo[30s])`, - Plan: &plan.QueryPlan{ - AST: syntax.MustParseExpr(`rate({app="foo"} | unwrap foo[30s])`), + { + &logproto.SampleQueryRequest{ + Start: time.Unix(30, 0), + End: time.Unix(60, 0), + Selector: `rate({app="foo"} | unwrap foo[30s])`, + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`rate({app="foo"} | unwrap foo[30s])`), + }, }, }, - }}, + }, // there are 15 samples (from 47 to 61) matched from the generated series // SUM(n=47, 61, 1) = 15 // 15 / 30 = 0.5 @@ -1611,10 +1613,12 @@ func TestEngine_RangeQuery(t *testing.T) { promql.Series{ // vector result Metric: labels.Labels(nil), - Floats: []promql.FPoint{{T: 60000, F: 0}, {T: 80000, F: 0}, {T: 100000, F: 0}, {T: 120000, F: 0}, {T: 140000, F: 0}, {T: 160000, F: 0}, {T: 180000, F: 0}}}, + Floats: []promql.FPoint{{T: 60000, F: 0}, {T: 80000, F: 0}, {T: 100000, F: 0}, {T: 120000, F: 0}, {T: 140000, F: 0}, {T: 160000, F: 0}, {T: 180000, F: 0}}, + }, promql.Series{ Metric: labels.FromStrings("app", "foo"), - Floats: []promql.FPoint{{T: 60000, F: 0.03333333333333333}, {T: 80000, F: 0.06666666666666667}, {T: 100000, F: 0.06666666666666667}, {T: 120000, F: 0.03333333333333333}, {T: 180000, F: 0.03333333333333333}}}, + Floats: []promql.FPoint{{T: 60000, F: 0.03333333333333333}, {T: 80000, F: 0.06666666666666667}, {T: 100000, F: 0.06666666666666667}, {T: 120000, F: 0.03333333333333333}, {T: 180000, F: 0.03333333333333333}}, + }, }, }, { @@ -2656,6 +2660,31 @@ func TestHashingStability(t *testing.T) { } } +func TestUnexpectedEmptyResults(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "fake") + + mock := &mockEvaluatorFactory{SampleEvaluatorFunc(func(context.Context, SampleEvaluatorFactory, syntax.SampleExpr, Params) (StepEvaluator, error) { + return EmptyEvaluator[SampleVector]{value: nil}, nil + })} + + eng := NewEngine(EngineOpts{}, nil, NoLimits, log.NewNopLogger()) + params, err := NewLiteralParams(`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, time.Now(), time.Now(), 0, 0, logproto.BACKWARD, 0, nil, nil) + require.NoError(t, err) + q := eng.Query(params).(*query) + q.evaluator = mock + + _, err = q.Exec(ctx) + require.Error(t, err) +} + +type mockEvaluatorFactory struct { + SampleEvaluatorFactory +} + +func (*mockEvaluatorFactory) NewIterator(context.Context, syntax.LogSelectorExpr, Params) (iter.EntryIterator, error) { + return nil, errors.New("unimplemented mock EntryEvaluatorFactory") +} + func getLocalQuerier(size int64) Querier { return &querierRecorder{ series: map[string][]logproto.Series{ diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index e24133d13bfe..6d0329cacf8d 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -14,7 +14,8 @@ import ( // of a windowed aggregation. func newFirstWithTimestampIterator( it iter.PeekingSampleIterator, - selRange, step, start, end, offset int64) RangeVectorIterator { + selRange, step, start, end, offset int64, +) RangeVectorIterator { inner := &batchRangeVectorIterator{ iter: it, step: step, @@ -67,7 +68,8 @@ func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint func newLastWithTimestampIterator( it iter.PeekingSampleIterator, - selRange, step, start, end, offset int64) RangeVectorIterator { + selRange, step, start, end, offset int64, +) RangeVectorIterator { inner := &batchRangeVectorIterator{ iter: it, step: step, @@ -129,10 +131,7 @@ type mergeOverTimeStepEvaluator struct { // Next returns the first or last element within one step of each matrix. func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { - - var ( - vec promql.Vector - ) + var vec promql.Vector e.ts = e.ts.Add(e.step) if e.ts.After(e.end) { @@ -158,10 +157,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { vec[i].T = ts } - if len(vec) == 0 { - return e.hasNext(), ts, SampleVector(vec) - } - return true, ts, SampleVector(vec) } @@ -179,18 +174,6 @@ func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool { return (ts-e.step.Milliseconds()) <= t && t < ts } -func (e *mergeOverTimeStepEvaluator) hasNext() bool { - for _, m := range e.matrices { - for _, s := range m { - if len(s.Floats) != 0 { - return true - } - } - } - - return false -} - func (*mergeOverTimeStepEvaluator) Close() error { return nil } func (*mergeOverTimeStepEvaluator) Error() error { return nil }