diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 6352f1ff7d3a..537ea9a700aa 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -144,21 +144,26 @@ func labelsString(b *bytes.Buffer, ls labels.Labels) { // Lookup all fingerprints for the provided matchers. func (ii *InvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) { - if len(matchers) == 0 { - return nil, nil - } - if err := validateShard(ii.totalShards, shard); err != nil { return nil, err } - result := []model.Fingerprint{} + var result []model.Fingerprint shards := ii.getShards(shard) + + // if no matcher is specified, all fingerprints would be returned + if len(matchers) == 0 { + for i := range shards { + fps := shards[i].allFPs() + result = append(result, fps...) + } + return result, nil + } + for i := range shards { fps := shards[i].lookup(matchers) result = append(result, fps...) } - return result, nil } @@ -310,6 +315,31 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint return result } +func (shard *indexShard) allFPs() model.Fingerprints { + shard.mtx.RLock() + defer shard.mtx.RUnlock() + + var fps model.Fingerprints + for _, ie := range shard.idx { + for _, ive := range ie.fps { + fps = append(fps, ive.fps...) + } + } + if len(fps) == 0 { + return nil + } + + var result model.Fingerprints + var m = map[model.Fingerprint]struct{}{} + for _, fp := range fps { + if _, ok := m[fp]; !ok { + m[fp] = struct{}{} + result = append(result, fp) + } + } + return result +} + func (shard *indexShard) labelNames() []string { shard.mtx.RLock() defer shard.mtx.RUnlock() diff --git a/pkg/ingester/index/index_test.go b/pkg/ingester/index/index_test.go index 05dbebdeee09..2a5974c8d83a 100644 --- a/pkg/ingester/index/index_test.go +++ b/pkg/ingester/index/index_test.go @@ -114,6 +114,26 @@ func Test_hash_mapping(t *testing.T) { } } +func Test_NoMatcherLookup(t *testing.T) { + lbs := labels.Labels{ + labels.Label{Name: "foo", Value: "bar"}, + labels.Label{Name: "hi", Value: "hello"}, + } + // with no shard param + ii := NewWithShards(16) + ii.Add(cortexpb.FromLabelsToLabelAdapters(lbs), 1) + ids, err := ii.Lookup(nil, nil) + require.Nil(t, err) + require.Equal(t, model.Fingerprint(1), ids[0]) + + // with shard param + ii = NewWithShards(16) + ii.Add(cortexpb.FromLabelsToLabelAdapters(lbs), 1) + ids, err = ii.Lookup(nil, &astmapper.ShardAnnotation{Shard: int(labelsSeriesIDHash(lbs) % 16), Of: 16}) + require.Nil(t, err) + require.Equal(t, model.Fingerprint(1), ids[0]) +} + func Test_ConsistentMapping(t *testing.T) { a := NewWithShards(16) b := NewWithShards(32) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 7b499bd3dfb3..91a662279d07 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -310,17 +310,10 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter ingStats := stats.GetIngesterData(ctx) var iters []iter.EntryIterator - var shard *astmapper.ShardAnnotation - shards, err := logql.ParseShards(req.Shards) + shard, err := parseShardFromRequest(req.Shards) if err != nil { return nil, err } - if len(shards) > 1 { - return nil, errors.New("only one shard per ingester query is supported") - } - if len(shards) == 1 { - shard = &shards[0] - } err = i.forMatchingStreams( ctx, @@ -418,13 +411,17 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo if err != nil { return nil, err } + shard, err := parseShardFromRequest(req.Shards) + if err != nil { + return nil, err + } var series []logproto.SeriesIdentifier // If no matchers were supplied we include all streams. if len(groups) == 0 { series = make([]logproto.SeriesIdentifier, 0, len(i.streams)) - err = i.forAllStreams(ctx, func(stream *stream) error { + err = i.forMatchingStreams(ctx, nil, shard, func(stream *stream) error { // consider the stream only if it overlaps the request time range if shouldConsiderStream(stream, req) { series = append(series, logproto.SeriesIdentifier{ @@ -439,7 +436,7 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo } else { dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) for _, matchers := range groups { - err = i.forMatchingStreams(ctx, matchers, nil, func(stream *stream) error { + err = i.forMatchingStreams(ctx, matchers, shard, func(stream *stream) error { // consider the stream only if it overlaps the request time range if shouldConsiderStream(stream, req) { // exit early when this stream was added by an earlier group @@ -613,6 +610,21 @@ func (i *instance) openTailersCount() uint32 { return uint32(len(i.tailers)) } +func parseShardFromRequest(reqShards []string) (*astmapper.ShardAnnotation, error) { + var shard *astmapper.ShardAnnotation + shards, err := logql.ParseShards(reqShards) + if err != nil { + return nil, err + } + if len(shards) > 1 { + return nil, errors.New("only one shard per ingester query is supported") + } + if len(shards) == 1 { + shard = &shards[0] + } + return shard, nil +} + func isDone(ctx context.Context) bool { select { case <-ctx.Done(): diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index f0bf34844b74..6970b037c5bf 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/require" @@ -160,11 +162,13 @@ func Test_SeriesQuery(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + indexShards := 2 // just some random values cfg := defaultConfig() cfg.SyncPeriod = 1 * time.Minute cfg.SyncMinUtilization = 0.20 + cfg.IndexShards = indexShards instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) @@ -212,6 +216,22 @@ func Test_SeriesQuery(t *testing.T) { {Labels: map[string]string{"app": "test2", "job": "varlogs"}}, }, }, + { + "overlapping request with shard param", + &logproto.SeriesRequest{ + Start: currentTime.Add(1 * time.Nanosecond), + End: currentTime.Add(7 * time.Nanosecond), + Groups: []string{`{job="varlogs"}`}, + Shards: []string{astmapper.ShardAnnotation{ + Shard: 1, + Of: indexShards, + }.String()}, + }, + []logproto.SeriesIdentifier{ + // Separated by shard number + {Labels: map[string]string{"app": "test2", "job": "varlogs"}}, + }, + }, { "request end time overlaps stream start time", &logproto.SeriesRequest{