diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index a99223f04f17..ca5605174fbe 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -255,9 +255,12 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp if len(groups) == 0 { series = make([]logproto.SeriesIdentifier, 0, len(i.streams)) err = i.forAllStreams(func(stream *stream) error { - series = append(series, logproto.SeriesIdentifier{ - Labels: stream.labels.Map(), - }) + // consider the stream only if it overlaps the request time range + if shouldConsiderStream(stream, req) { + series = append(series, logproto.SeriesIdentifier{ + Labels: stream.labels.Map(), + }) + } return nil }) if err != nil { @@ -267,14 +270,17 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) for _, matchers := range groups { err = i.forMatchingStreams(matchers, func(stream *stream) error { - // exit early when this stream was added by an earlier group - key := stream.labels.Hash() - if _, found := dedupedSeries[key]; found { - return nil - } - - dedupedSeries[key] = logproto.SeriesIdentifier{ - Labels: stream.labels.Map(), + // consider the stream only if it overlaps the request time range + if shouldConsiderStream(stream, req) { + // exit early when this stream was added by an earlier group + key := stream.labels.Hash() + if _, found := dedupedSeries[key]; found { + return nil + } + + dedupedSeries[key] = logproto.SeriesIdentifier{ + Labels: stream.labels.Map(), + } } return nil }) @@ -459,3 +465,13 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer logproto } return nil } + +func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool { + firstchunkFrom, _ := stream.chunks[0].chunk.Bounds() + _, lastChunkTo := stream.chunks[len(stream.chunks)-1].chunk.Bounds() + + if req.End.UnixNano() > firstchunkFrom.UnixNano() && req.Start.UnixNano() <= lastChunkTo.UnixNano() { + return true + } + return false +} diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 1f7f3638b363..13955730fa20 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sort" "sync" "testing" "time" @@ -142,6 +143,102 @@ func TestSyncPeriod(t *testing.T) { } } +func Test_SeriesQuery(t *testing.T) { + limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) + require.NoError(t, err) + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + + // just some random values + syncPeriod := 1 * time.Minute + minUtil := 0.20 + + instance := newInstance(&Config{}, "test", defaultFactory, limiter, syncPeriod, minUtil) + + currentTime := time.Now() + + testStreams := []logproto.Stream{ + {Labels: "{app=\"test\",job=\"varlogs\"}", Entries: entries(5, currentTime)}, + {Labels: "{app=\"test2\",job=\"varlogs\"}", Entries: entries(5, currentTime.Add(6*time.Nanosecond))}, + } + + for _, testStream := range testStreams { + stream, err := instance.getOrCreateStream(testStream) + require.NoError(t, err) + chunk := defaultFactory() + for _, entry := range testStream.Entries { + err = chunk.Append(&entry) + require.NoError(t, err) + } + stream.chunks = append(stream.chunks, chunkDesc{chunk: chunk}) + } + + tests := []struct { + name string + req *logproto.SeriesRequest + expectedResponse []logproto.SeriesIdentifier + }{ + { + "non overlapping request", + &logproto.SeriesRequest{ + Start: currentTime.Add(11 * time.Nanosecond), + End: currentTime.Add(12 * time.Nanosecond), + Groups: []string{`{job="varlogs"}`}, + }, + []logproto.SeriesIdentifier{}, + }, + { + "overlapping request", + &logproto.SeriesRequest{ + Start: currentTime.Add(1 * time.Nanosecond), + End: currentTime.Add(7 * time.Nanosecond), + Groups: []string{`{job="varlogs"}`}, + }, + []logproto.SeriesIdentifier{ + {Labels: map[string]string{"app": "test", "job": "varlogs"}}, + {Labels: map[string]string{"app": "test2", "job": "varlogs"}}, + }, + }, + { + "request end time overlaps stream start time", + &logproto.SeriesRequest{ + Start: currentTime.Add(1 * time.Nanosecond), + End: currentTime.Add(6 * time.Nanosecond), + Groups: []string{`{job="varlogs"}`}, + }, + []logproto.SeriesIdentifier{ + {Labels: map[string]string{"app": "test", "job": "varlogs"}}, + }, + }, + { + "request start time overlaps stream end time", + &logproto.SeriesRequest{ + Start: currentTime.Add(10 * time.Nanosecond), + End: currentTime.Add(11 * time.Nanosecond), + Groups: []string{`{job="varlogs"}`}, + }, + []logproto.SeriesIdentifier{ + {Labels: map[string]string{"app": "test2", "job": "varlogs"}}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + resp, err := instance.Series(context.Background(), tc.req) + require.NoError(t, err) + + sort.Slice(resp.Series, func(i, j int) bool { + return resp.Series[i].String() < resp.Series[j].String() + }) + sort.Slice(tc.expectedResponse, func(i, j int) bool { + return tc.expectedResponse[i].String() < tc.expectedResponse[j].String() + }) + require.Equal(t, tc.expectedResponse, resp.Series) + }) + } + +} + func entries(n int, t time.Time) []logproto.Entry { var result []logproto.Entry for i := 0; i < n; i++ {