Skip to content

Commit

Permalink
Refactor codes
Browse files Browse the repository at this point in the history
* remove duplicate logic in instance
* format with goimport
  • Loading branch information
taisho6339 committed Oct 29, 2021
1 parent 39cb18f commit 5b48e51
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 23 deletions.
11 changes: 6 additions & 5 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,20 @@ func (ii *InvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.Sha

var result []model.Fingerprint
shards := ii.getShards(shard)

// if no matcher is specified, all fingerprints would be returned
if len(matchers) == 0 {
for i := range shards {
fps := shards[i].allFPs()
result = append(result, fps...)
}
} else {
for i := range shards {
fps := shards[i].lookup(matchers)
result = append(result, fps...)
}
return result, nil
}

for i := range shards {
fps := shards[i].lookup(matchers)
result = append(result, fps...)
}
return result, nil
}

Expand Down
34 changes: 17 additions & 17 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,10 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter
ingStats := stats.GetIngesterData(ctx)
var iters []iter.EntryIterator

var shard *astmapper.ShardAnnotation
shards, err := logql.ParseShards(req.Shards)
shard, err := parseShardFromRequest(req.Shards)
if err != nil {
return nil, err
}
if len(shards) > 1 {
return nil, errors.New("only one shard per ingester query is supported")
}
if len(shards) == 1 {
shard = &shards[0]
}

err = i.forMatchingStreams(
ctx,
Expand Down Expand Up @@ -418,18 +411,10 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo
if err != nil {
return nil, err
}

var shard *astmapper.ShardAnnotation
shards, err := logql.ParseShards(req.Shards)
shard, err := parseShardFromRequest(req.Shards)
if err != nil {
return nil, err
}
if len(shards) > 1 {
return nil, errors.New("only one shard per ingester query is supported")
}
if len(shards) == 1 {
shard = &shards[0]
}

var series []logproto.SeriesIdentifier

Expand Down Expand Up @@ -625,6 +610,21 @@ func (i *instance) openTailersCount() uint32 {
return uint32(len(i.tailers))
}

func parseShardFromRequest(reqShards []string) (*astmapper.ShardAnnotation, error) {
var shard *astmapper.ShardAnnotation
shards, err := logql.ParseShards(reqShards)
if err != nil {
return nil, err
}
if len(shards) > 1 {
return nil, errors.New("only one shard per ingester query is supported")
}
if len(shards) == 1 {
shard = &shards[0]
}
return shard, nil
}

func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package ingester
import (
"context"
"fmt"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"math/rand"
"runtime"
"sort"
"sync"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/astmapper"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
Expand Down

0 comments on commit 5b48e51

Please sign in to comment.