From 39cb18f65d8745b7bb4f517299ead9998ddc12a8 Mon Sep 17 00:00:00 2001 From: taisho6339 Date: Fri, 29 Oct 2021 00:45:26 +0900 Subject: [PATCH 1/3] Respect shard number in series api --- pkg/ingester/index/index.go | 37 +++++++++++++++++++++++++------- pkg/ingester/index/index_test.go | 20 +++++++++++++++++ pkg/ingester/instance.go | 16 ++++++++++++-- pkg/ingester/instance_test.go | 19 ++++++++++++++++ 4 files changed, 82 insertions(+), 10 deletions(-) diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 6352f1ff7d3a..c27add128f9e 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -144,19 +144,23 @@ func labelsString(b *bytes.Buffer, ls labels.Labels) { // Lookup all fingerprints for the provided matchers. func (ii *InvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) { - if len(matchers) == 0 { - return nil, nil - } - if err := validateShard(ii.totalShards, shard); err != nil { return nil, err } - result := []model.Fingerprint{} + var result []model.Fingerprint shards := ii.getShards(shard) - for i := range shards { - fps := shards[i].lookup(matchers) - result = append(result, fps...) + // if no matcher is specified, all fingerprints would be returned + if len(matchers) == 0 { + for i := range shards { + fps := shards[i].allFPs() + result = append(result, fps...) + } + } else { + for i := range shards { + fps := shards[i].lookup(matchers) + result = append(result, fps...) + } } return result, nil @@ -310,6 +314,23 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint return result } +func (shard *indexShard) allFPs() model.Fingerprints { + shard.mtx.RLock() + defer shard.mtx.RUnlock() + + var result model.Fingerprints + for _, ie := range shard.idx { + for _, ive := range ie.fps { + result = intersect(result, ive.fps) + } + sort.Sort(result) + } + if len(result) == 0 { + return nil + } + return result +} + func (shard *indexShard) labelNames() []string { shard.mtx.RLock() defer shard.mtx.RUnlock() diff --git a/pkg/ingester/index/index_test.go b/pkg/ingester/index/index_test.go index 05dbebdeee09..2a5974c8d83a 100644 --- a/pkg/ingester/index/index_test.go +++ b/pkg/ingester/index/index_test.go @@ -114,6 +114,26 @@ func Test_hash_mapping(t *testing.T) { } } +func Test_NoMatcherLookup(t *testing.T) { + lbs := labels.Labels{ + labels.Label{Name: "foo", Value: "bar"}, + labels.Label{Name: "hi", Value: "hello"}, + } + // with no shard param + ii := NewWithShards(16) + ii.Add(cortexpb.FromLabelsToLabelAdapters(lbs), 1) + ids, err := ii.Lookup(nil, nil) + require.Nil(t, err) + require.Equal(t, model.Fingerprint(1), ids[0]) + + // with shard param + ii = NewWithShards(16) + ii.Add(cortexpb.FromLabelsToLabelAdapters(lbs), 1) + ids, err = ii.Lookup(nil, &astmapper.ShardAnnotation{Shard: int(labelsSeriesIDHash(lbs) % 16), Of: 16}) + require.Nil(t, err) + require.Equal(t, model.Fingerprint(1), ids[0]) +} + func Test_ConsistentMapping(t *testing.T) { a := NewWithShards(16) b := NewWithShards(32) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 7b499bd3dfb3..88e6aa76431a 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -419,12 +419,24 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo return nil, err } + var shard *astmapper.ShardAnnotation + shards, err := logql.ParseShards(req.Shards) + if err != nil { + return nil, err + } + if len(shards) > 1 { + return nil, errors.New("only one shard per ingester query is supported") + } + if len(shards) == 1 { + shard = &shards[0] + } + var series []logproto.SeriesIdentifier // If no matchers were supplied we include all streams. if len(groups) == 0 { series = make([]logproto.SeriesIdentifier, 0, len(i.streams)) - err = i.forAllStreams(ctx, func(stream *stream) error { + err = i.forMatchingStreams(ctx, nil, shard, func(stream *stream) error { // consider the stream only if it overlaps the request time range if shouldConsiderStream(stream, req) { series = append(series, logproto.SeriesIdentifier{ @@ -439,7 +451,7 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo } else { dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) for _, matchers := range groups { - err = i.forMatchingStreams(ctx, matchers, nil, func(stream *stream) error { + err = i.forMatchingStreams(ctx, matchers, shard, func(stream *stream) error { // consider the stream only if it overlaps the request time range if shouldConsiderStream(stream, req) { // exit early when this stream was added by an earlier group diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index f0bf34844b74..d2dfe57d496f 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -3,6 +3,7 @@ package ingester import ( "context" "fmt" + "github.com/cortexproject/cortex/pkg/querier/astmapper" "math/rand" "runtime" "sort" @@ -160,11 +161,13 @@ func Test_SeriesQuery(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + indexShards := 2 // just some random values cfg := defaultConfig() cfg.SyncPeriod = 1 * time.Minute cfg.SyncMinUtilization = 0.20 + cfg.IndexShards = indexShards instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) @@ -212,6 +215,22 @@ func Test_SeriesQuery(t *testing.T) { {Labels: map[string]string{"app": "test2", "job": "varlogs"}}, }, }, + { + "overlapping request with shard param", + &logproto.SeriesRequest{ + Start: currentTime.Add(1 * time.Nanosecond), + End: currentTime.Add(7 * time.Nanosecond), + Groups: []string{`{job="varlogs"}`}, + Shards: []string{astmapper.ShardAnnotation{ + Shard: 1, + Of: indexShards, + }.String()}, + }, + []logproto.SeriesIdentifier{ + // Separated by shard number + {Labels: map[string]string{"app": "test2", "job": "varlogs"}}, + }, + }, { "request end time overlaps stream start time", &logproto.SeriesRequest{ From 5b48e51d545b706ec829f60ed0318debcd35beab Mon Sep 17 00:00:00 2001 From: taisho6339 Date: Fri, 29 Oct 2021 11:04:32 +0900 Subject: [PATCH 2/3] Refactor codes * remove duplicate logic in instance * format with goimport --- pkg/ingester/index/index.go | 11 ++++++----- pkg/ingester/instance.go | 34 +++++++++++++++++----------------- pkg/ingester/instance_test.go | 3 ++- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index c27add128f9e..7b689c014121 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -150,19 +150,20 @@ func (ii *InvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.Sha var result []model.Fingerprint shards := ii.getShards(shard) + // if no matcher is specified, all fingerprints would be returned if len(matchers) == 0 { for i := range shards { fps := shards[i].allFPs() result = append(result, fps...) } - } else { - for i := range shards { - fps := shards[i].lookup(matchers) - result = append(result, fps...) - } + return result, nil } + for i := range shards { + fps := shards[i].lookup(matchers) + result = append(result, fps...) + } return result, nil } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 88e6aa76431a..91a662279d07 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -310,17 +310,10 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter ingStats := stats.GetIngesterData(ctx) var iters []iter.EntryIterator - var shard *astmapper.ShardAnnotation - shards, err := logql.ParseShards(req.Shards) + shard, err := parseShardFromRequest(req.Shards) if err != nil { return nil, err } - if len(shards) > 1 { - return nil, errors.New("only one shard per ingester query is supported") - } - if len(shards) == 1 { - shard = &shards[0] - } err = i.forMatchingStreams( ctx, @@ -418,18 +411,10 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo if err != nil { return nil, err } - - var shard *astmapper.ShardAnnotation - shards, err := logql.ParseShards(req.Shards) + shard, err := parseShardFromRequest(req.Shards) if err != nil { return nil, err } - if len(shards) > 1 { - return nil, errors.New("only one shard per ingester query is supported") - } - if len(shards) == 1 { - shard = &shards[0] - } var series []logproto.SeriesIdentifier @@ -625,6 +610,21 @@ func (i *instance) openTailersCount() uint32 { return uint32(len(i.tailers)) } +func parseShardFromRequest(reqShards []string) (*astmapper.ShardAnnotation, error) { + var shard *astmapper.ShardAnnotation + shards, err := logql.ParseShards(reqShards) + if err != nil { + return nil, err + } + if len(shards) > 1 { + return nil, errors.New("only one shard per ingester query is supported") + } + if len(shards) == 1 { + shard = &shards[0] + } + return shard, nil +} + func isDone(ctx context.Context) bool { select { case <-ctx.Done(): diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index d2dfe57d496f..6970b037c5bf 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -3,7 +3,6 @@ package ingester import ( "context" "fmt" - "github.com/cortexproject/cortex/pkg/querier/astmapper" "math/rand" "runtime" "sort" @@ -11,6 +10,8 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/require" From cc1c677d7b72a99f8e89c28efb9fbc13d4c59d4c Mon Sep 17 00:00:00 2001 From: taisho6339 Date: Fri, 29 Oct 2021 11:04:49 +0900 Subject: [PATCH 3/3] Improve complexity of getting all fingerprints --- pkg/ingester/index/index.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 7b689c014121..537ea9a700aa 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -319,16 +319,24 @@ func (shard *indexShard) allFPs() model.Fingerprints { shard.mtx.RLock() defer shard.mtx.RUnlock() - var result model.Fingerprints + var fps model.Fingerprints for _, ie := range shard.idx { for _, ive := range ie.fps { - result = intersect(result, ive.fps) + fps = append(fps, ive.fps...) } - sort.Sort(result) } - if len(result) == 0 { + if len(fps) == 0 { return nil } + + var result model.Fingerprints + var m = map[model.Fingerprint]struct{}{} + for _, fp := range fps { + if _, ok := m[fp]; !ok { + m[fp] = struct{}{} + result = append(result, fp) + } + } return result }