From ce980305d90dd17d05c23df51cddb3c3c673778d Mon Sep 17 00:00:00 2001 From: mizeng Date: Mon, 1 Jul 2019 08:41:54 +0800 Subject: [PATCH] sync with Cortex for s3 path style url --- Gopkg.lock | 64 +++++++++++++- docs/operations.md | 10 +++ .../pkg/chunk/aws/dynamodb_storage_client.go | 11 ++- .../cortex/pkg/chunk/aws/s3_storage_client.go | 2 + .../cortex/pkg/chunk/encoding/bigchunk.go | 8 +- .../pkg/chunk/local/boltdb_index_client.go | 3 +- .../cortexproject/cortex/pkg/chunk/schema.go | 49 +++++------ .../cortex/pkg/chunk/series_store.go | 40 +++++---- .../chunk/storage/caching_index_client.pb.go | 19 ++--- .../cortex/pkg/ingester/client/cortex.pb.go | 85 ++++--------------- .../cortexproject/cortex/pkg/ring/model.go | 2 +- .../cortexproject/cortex/pkg/ring/ring.go | 7 +- .../cortexproject/cortex/pkg/ring/ring.pb.go | 13 +-- 13 files changed, 164 insertions(+), 149 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index c2a8443715f9..1b1866764884 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -222,7 +222,7 @@ [[projects]] branch = "master" - digest = "1:960e80517f3751439c5304aa501a68dcc81f4e70a7cfa84ff658f5fc939a0c0f" + digest = "1:2f0846dd85df3365a80c32ff994eb1fcee5eec2c51a812ceec182398f3ef85f4" name = "github.com/cortexproject/cortex" packages = [ "pkg/chunk", @@ -248,7 +248,7 @@ "pkg/util/validation", ] pruneopts = "UT" - revision = "5612c6ff1c3142583ae18f47e7af99b1827f758e" + revision = "e1ab5495e8a846891e3b6b8e757e63201b886bec" [[projects]] digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec" @@ -560,6 +560,39 @@ revision = "66b9c49e59c6c48f0ffce28c2d8b8a5678502c6d" version = "v1.4.0" +[[projects]] + branch = "master" + digest = "1:1f4181cfeacebef71babf22e99d727c1667e1f620982787c7035653d6e887dbb" + name = "github.com/grafana/loki" + packages = [ + "pkg/chunkenc", + "pkg/distributor", + "pkg/helpers", + "pkg/ingester", + "pkg/ingester/client", + "pkg/iter", + "pkg/logentry/metric", + "pkg/logentry/stages", + "pkg/logproto", + "pkg/logql", + "pkg/loki", + "pkg/promtail", + "pkg/promtail/api", + "pkg/promtail/client", + "pkg/promtail/client/fake", + "pkg/promtail/config", + "pkg/promtail/positions", + "pkg/promtail/scrape", + "pkg/promtail/server", + "pkg/promtail/server/ui", + "pkg/promtail/targets", + "pkg/querier", + "pkg/util", + "pkg/util/flagext", + ] + pruneopts = "UT" + revision = "4c7138231f77997909564616efc5d0cdbcb1ead8" + [[projects]] digest = "1:1168584a5881d371e96cb0e66ef6db71d7cef0856cc7f311490bc856627f8328" name = "github.com/grpc-ecosystem/go-grpc-middleware" @@ -1573,6 +1606,7 @@ "github.com/docker/docker/api/types/plugins/logdriver", "github.com/docker/docker/daemon/logger", "github.com/docker/docker/daemon/logger/jsonfilelog", + "github.com/docker/docker/daemon/logger/templates", "github.com/docker/docker/pkg/ioutils", "github.com/docker/go-plugins-helpers/sdk", "github.com/fatih/color", @@ -1585,6 +1619,30 @@ "github.com/golang/snappy", "github.com/gorilla/mux", "github.com/gorilla/websocket", + "github.com/grafana/loki/pkg/chunkenc", + "github.com/grafana/loki/pkg/distributor", + "github.com/grafana/loki/pkg/helpers", + "github.com/grafana/loki/pkg/ingester", + "github.com/grafana/loki/pkg/ingester/client", + "github.com/grafana/loki/pkg/iter", + "github.com/grafana/loki/pkg/logentry/metric", + "github.com/grafana/loki/pkg/logentry/stages", + "github.com/grafana/loki/pkg/logproto", + "github.com/grafana/loki/pkg/logql", + "github.com/grafana/loki/pkg/loki", + "github.com/grafana/loki/pkg/promtail", + "github.com/grafana/loki/pkg/promtail/api", + "github.com/grafana/loki/pkg/promtail/client", + "github.com/grafana/loki/pkg/promtail/client/fake", + "github.com/grafana/loki/pkg/promtail/config", + "github.com/grafana/loki/pkg/promtail/positions", + "github.com/grafana/loki/pkg/promtail/scrape", + "github.com/grafana/loki/pkg/promtail/server", + "github.com/grafana/loki/pkg/promtail/server/ui", + "github.com/grafana/loki/pkg/promtail/targets", + "github.com/grafana/loki/pkg/querier", + "github.com/grafana/loki/pkg/util", + "github.com/grafana/loki/pkg/util/flagext", "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc", "github.com/hpcloud/tail", "github.com/jmespath/go-jmespath", @@ -1611,12 +1669,12 @@ "github.com/shurcooL/httpfs/filter", "github.com/shurcooL/httpfs/union", "github.com/shurcooL/vfsgen", - "github.com/sirupsen/logrus", "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", "github.com/tonistiigi/fifo", "github.com/weaveworks/common/httpgrpc", "github.com/weaveworks/common/httpgrpc/server", + "github.com/weaveworks/common/logging", "github.com/weaveworks/common/middleware", "github.com/weaveworks/common/server", "github.com/weaveworks/common/tracing", diff --git a/docs/operations.md b/docs/operations.md index 887345da14a0..a784d33628fa 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -168,6 +168,16 @@ will stored in `s3://BUCKET_NAME/faker/`. The S3 configuration is setup with url format: `s3://access_key:secret_access_key@region/bucket_name`. +For custom S3 endpoint (like Ceph Object Storage with S3 Compatible API), if it's using path-style url rather than +virtual hosted bucket addressing, please set config like below: + +```yaml +storage_config: + aws: + s3: s3://access_key:secret_access_key@custom_endpoint/bucket_name + s3forcepathstyle: true +``` + #### DynamoDB Loki uses DynamoDB for the index storage. It is used for querying logs, make diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go index 626ce905686d..4810bf3cf60d 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go @@ -126,7 +126,8 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { // StorageConfig specifies config for storing data on AWS. type StorageConfig struct { DynamoDBConfig - S3 flagext.URLValue + S3 flagext.URLValue + S3ForcePathStyle bool } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -135,6 +136,7 @@ func (cfg *StorageConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.S3, "s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+ "If only region is specified as a host, proper endpoint will be deduced. Use inmemory:/// to use a mock in-memory implementation.") + f.BoolVar(&cfg.S3ForcePathStyle, "s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.") } type dynamoDBStorageClient struct { @@ -270,9 +272,10 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write } // If there are unprocessed items, retry those items. - if unprocessedItems := resp.UnprocessedItems; unprocessedItems != nil && dynamoDBWriteBatch(unprocessedItems).Len() > 0 { - logWriteRetry(ctx, dynamoDBWriteBatch(unprocessedItems)) - a.writeThrottle.WaitN(ctx, len(unprocessedItems)) + unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems) + if len(unprocessedItems) > 0 { + logWriteRetry(ctx, unprocessedItems) + a.writeThrottle.WaitN(ctx, unprocessedItems.Len()) unprocessed.TakeReqs(unprocessedItems, -1) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go index 81a18254e8c8..dfb74a42584f 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go @@ -47,6 +47,8 @@ func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.O return nil, err } + s3Config = s3Config.WithS3ForcePathStyle(cfg.S3ForcePathStyle) // support for Path Style S3 url if has the flag + s3Config = s3Config.WithMaxRetries(0) // We do our own retries, so we can monitor them s3Client := s3.New(session.New(s3Config)) bucketName := strings.TrimPrefix(cfg.S3.URL.Path, "/") diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go index 0282c08e9976..b028d6e8ab03 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go @@ -170,9 +170,15 @@ func (b *bigchunk) Size() int { } func (b *bigchunk) NewIterator() Iterator { + var it chunkenc.Iterator + if len(b.chunks) > 0 { + it = b.chunks[0].Iterator() + } else { + it = chunkenc.NewNopIterator() + } return &bigchunkIterator{ bigchunk: b, - curr: b.chunks[0].Iterator(), + curr: it, } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go index 55f5416212ab..624e6ff4edac 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go @@ -140,7 +140,8 @@ func (b *boltIndexClient) getDB(name string) (*bbolt.DB, error) { } // Open the database. - db, err := bbolt.Open(path.Join(b.cfg.Directory, name), 0666, nil) + // Set Timeout to avoid obtaining file lock wait indefinitely. + db, err := bbolt.Open(path.Join(b.cfg.Directory, name), 0666, &bbolt.Options{Timeout: 5 * time.Second}) if err != nil { return nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go index 58e23e8556f3..4e3142c5e501 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go @@ -34,9 +34,9 @@ type Schema interface { GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) // Should only be used with the seriesStore. TODO: Make seriesStore implement a different interface altogether. - GetLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) + // returns cache key string and []IndexEntry per bucket, matched in order + GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) - GetLabelEntryCacheKeys(from, through model.Time, userID string, labels labels.Labels) []string // When doing a read, use these methods to return the list of entries you should query GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) @@ -97,17 +97,31 @@ func (s schema) GetWriteEntries(from, through model.Time, userID string, metricN return result, nil } -func (s schema) GetLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { - var result []IndexEntry +// returns cache key string and []IndexEntry per bucket, matched in order +func (s schema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) { + var keys []string + var indexEntries [][]IndexEntry for _, bucket := range s.buckets(from, through, userID) { + key := strings.Join([]string{ + bucket.tableName, + bucket.hashKey, + string(labelsSeriesID(labels)), + }, + "-", + ) + // This is just encoding to remove invalid characters so that we can put them in memcache. + // We're not hashing them as the length of the key is well within memcache bounds. tableName + userid + day + 32Byte(seriesID) + key = hex.EncodeToString([]byte(key)) + keys = append(keys, key) + entries, err := s.entries.GetLabelWriteEntries(bucket, metricName, labels, chunkID) if err != nil { - return nil, err + return nil, nil, err } - result = append(result, entries...) + indexEntries = append(indexEntries, entries) } - return result, nil + return keys, indexEntries, nil } func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { @@ -124,27 +138,6 @@ func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, me } -// Should only used for v9Schema -func (s schema) GetLabelEntryCacheKeys(from, through model.Time, userID string, labels labels.Labels) []string { - var result []string - for _, bucket := range s.buckets(from, through, userID) { - key := strings.Join([]string{ - bucket.tableName, - bucket.hashKey, - string(labelsSeriesID(labels)), - }, - "-", - ) - // This is just encoding to remove invalid characters so that we can put them in memcache. - // We're not hashing them as the length of the key is well within memcache bounds. tableName + userid + day + 32Byte(seriesID) - key = hex.EncodeToString([]byte(key)) - - result = append(result, key) - } - - return result -} - func (s schema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) { var result []IndexQuery diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index 2c1ffb2efc7b..5462a0fc38fc 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -42,22 +42,22 @@ var ( Namespace: "cortex", Name: "chunk_store_series_pre_intersection_per_query", Help: "Distribution of #series (pre intersection) per query.", - // A reasonable upper bound is around 100k - 10*(8^5) = 327k. - Buckets: prometheus.ExponentialBuckets(10, 8, 5), + // A reasonable upper bound is around 100k - 10*(8^(6-1)) = 327k. + Buckets: prometheus.ExponentialBuckets(10, 8, 6), }) postIntersectionPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_series_post_intersection_per_query", Help: "Distribution of #series (post intersection) per query.", - // A reasonable upper bound is around 100k - 10*(8^5) = 327k. - Buckets: prometheus.ExponentialBuckets(10, 8, 5), + // A reasonable upper bound is around 100k - 10*(8^(6-1)) = 327k. + Buckets: prometheus.ExponentialBuckets(10, 8, 6), }) chunksPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_chunks_per_query", Help: "Distribution of #chunks per query.", - // For 100k series for 7 week, could be 1.2m - 10*(8^6) = 2.6m. - Buckets: prometheus.ExponentialBuckets(10, 8, 6), + // For 100k series for 7 week, could be 1.2m - 10*(8^(7-1)) = 2.6m. + Buckets: prometheus.ExponentialBuckets(10, 8, 7), }) ) @@ -341,6 +341,12 @@ func (c *seriesStore) Put(ctx context.Context, chunks []Chunk) error { // PutOne implements ChunkStore func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { + // If this chunk is in cache it must already be in the database so we don't need to write it again + found, _, _ := c.cache.Fetch(ctx, []string{chunk.ExternalKey()}) + if len(found) > 0 { + return nil + } + chunks := []Chunk{chunk} writeReqs, keysToCache, err := c.calculateIndexEntries(from, through, chunk) @@ -372,23 +378,25 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun func (c *seriesStore) calculateIndexEntries(from, through model.Time, chunk Chunk) (WriteBatch, []string, error) { seenIndexEntries := map[string]struct{}{} entries := []IndexEntry{} - keysToCache := []string{} metricName := chunk.Metric.Get(labels.MetricName) if metricName == "" { return nil, nil, fmt.Errorf("no MetricNameLabel for chunk") } - keys := c.schema.GetLabelEntryCacheKeys(from, through, chunk.UserID, chunk.Metric) + keys, labelEntries, err := c.schema.GetCacheKeysAndLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) + if err != nil { + return nil, nil, err + } _, _, missing := c.writeDedupeCache.Fetch(context.Background(), keys) - if len(missing) != 0 { - labelEntries, err := c.schema.GetLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) - if err != nil { - return nil, nil, err + // keys and labelEntries are matched in order, but Fetch() may + // return missing keys in any order so check against all of them. + for _, missingKey := range missing { + for i, key := range keys { + if key == missingKey { + entries = append(entries, labelEntries[i]...) + } } - - entries = append(entries, labelEntries...) - keysToCache = missing } chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) @@ -410,5 +418,5 @@ func (c *seriesStore) calculateIndexEntries(from, through model.Time, chunk Chun } } - return result, keysToCache, nil + return result, missing, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.pb.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.pb.go index 96a81e59cbf0..6f60ba68010a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.pb.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.pb.go @@ -283,17 +283,17 @@ func (m *Entry) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintCachingIndexClient(dAtA, i, uint64(m.Column.Size())) - n1, err1 := m.Column.MarshalTo(dAtA[i:]) - if err1 != nil { - return 0, err1 + n1, err := m.Column.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err } i += n1 dAtA[i] = 0x12 i++ i = encodeVarintCachingIndexClient(dAtA, i, uint64(m.Value.Size())) - n2, err2 := m.Value.MarshalTo(dAtA[i:]) - if err2 != nil { - return 0, err2 + n2, err := m.Value.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err } i += n2 return i, nil @@ -420,13 +420,8 @@ func (this *ReadBatch) String() string { if this == nil { return "nil" } - repeatedStringForEntries := "[]Entry{" - for _, f := range this.Entries { - repeatedStringForEntries += strings.Replace(strings.Replace(f.String(), "Entry", "Entry", 1), `&`, ``, 1) + "," - } - repeatedStringForEntries += "}" s := strings.Join([]string{`&ReadBatch{`, - `Entries:` + repeatedStringForEntries + `,`, + `Entries:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Entries), "Entry", "Entry", 1), `&`, ``, 1) + `,`, `Key:` + fmt.Sprintf("%v", this.Key) + `,`, `Expiry:` + fmt.Sprintf("%v", this.Expiry) + `,`, `Cardinality:` + fmt.Sprintf("%v", this.Cardinality) + `,`, diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.pb.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.pb.go index 5b9106905f5a..0024675d96cf 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.pb.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.pb.go @@ -3311,9 +3311,9 @@ func (m *UserIDStatsResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintCortex(dAtA, i, uint64(m.Data.Size())) - n1, err1 := m.Data.MarshalTo(dAtA[i:]) - if err1 != nil { - return 0, err1 + n1, err := m.Data.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err } i += n1 } @@ -4194,13 +4194,8 @@ func (this *ReadRequest) String() string { if this == nil { return "nil" } - repeatedStringForQueries := "[]*QueryRequest{" - for _, f := range this.Queries { - repeatedStringForQueries += strings.Replace(f.String(), "QueryRequest", "QueryRequest", 1) + "," - } - repeatedStringForQueries += "}" s := strings.Join([]string{`&ReadRequest{`, - `Queries:` + repeatedStringForQueries + `,`, + `Queries:` + strings.Replace(fmt.Sprintf("%v", this.Queries), "QueryRequest", "QueryRequest", 1) + `,`, `}`, }, "") return s @@ -4209,13 +4204,8 @@ func (this *ReadResponse) String() string { if this == nil { return "nil" } - repeatedStringForResults := "[]*QueryResponse{" - for _, f := range this.Results { - repeatedStringForResults += strings.Replace(f.String(), "QueryResponse", "QueryResponse", 1) + "," - } - repeatedStringForResults += "}" s := strings.Join([]string{`&ReadResponse{`, - `Results:` + repeatedStringForResults + `,`, + `Results:` + strings.Replace(fmt.Sprintf("%v", this.Results), "QueryResponse", "QueryResponse", 1) + `,`, `}`, }, "") return s @@ -4224,15 +4214,10 @@ func (this *QueryRequest) String() string { if this == nil { return "nil" } - repeatedStringForMatchers := "[]*LabelMatcher{" - for _, f := range this.Matchers { - repeatedStringForMatchers += strings.Replace(f.String(), "LabelMatcher", "LabelMatcher", 1) + "," - } - repeatedStringForMatchers += "}" s := strings.Join([]string{`&QueryRequest{`, `StartTimestampMs:` + fmt.Sprintf("%v", this.StartTimestampMs) + `,`, `EndTimestampMs:` + fmt.Sprintf("%v", this.EndTimestampMs) + `,`, - `Matchers:` + repeatedStringForMatchers + `,`, + `Matchers:` + strings.Replace(fmt.Sprintf("%v", this.Matchers), "LabelMatcher", "LabelMatcher", 1) + `,`, `}`, }, "") return s @@ -4241,13 +4226,8 @@ func (this *QueryResponse) String() string { if this == nil { return "nil" } - repeatedStringForTimeseries := "[]TimeSeries{" - for _, f := range this.Timeseries { - repeatedStringForTimeseries += strings.Replace(strings.Replace(f.String(), "TimeSeries", "TimeSeries", 1), `&`, ``, 1) + "," - } - repeatedStringForTimeseries += "}" s := strings.Join([]string{`&QueryResponse{`, - `Timeseries:` + repeatedStringForTimeseries + `,`, + `Timeseries:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timeseries), "TimeSeries", "TimeSeries", 1), `&`, ``, 1) + `,`, `}`, }, "") return s @@ -4256,13 +4236,8 @@ func (this *QueryStreamResponse) String() string { if this == nil { return "nil" } - repeatedStringForTimeseries := "[]TimeSeriesChunk{" - for _, f := range this.Timeseries { - repeatedStringForTimeseries += strings.Replace(strings.Replace(f.String(), "TimeSeriesChunk", "TimeSeriesChunk", 1), `&`, ``, 1) + "," - } - repeatedStringForTimeseries += "}" s := strings.Join([]string{`&QueryStreamResponse{`, - `Timeseries:` + repeatedStringForTimeseries + `,`, + `Timeseries:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timeseries), "TimeSeriesChunk", "TimeSeriesChunk", 1), `&`, ``, 1) + `,`, `}`, }, "") return s @@ -4334,7 +4309,7 @@ func (this *UserIDStatsResponse) String() string { } s := strings.Join([]string{`&UserIDStatsResponse{`, `UserId:` + fmt.Sprintf("%v", this.UserId) + `,`, - `Data:` + strings.Replace(this.Data.String(), "UserStatsResponse", "UserStatsResponse", 1) + `,`, + `Data:` + strings.Replace(fmt.Sprintf("%v", this.Data), "UserStatsResponse", "UserStatsResponse", 1) + `,`, `}`, }, "") return s @@ -4343,13 +4318,8 @@ func (this *UsersStatsResponse) String() string { if this == nil { return "nil" } - repeatedStringForStats := "[]*UserIDStatsResponse{" - for _, f := range this.Stats { - repeatedStringForStats += strings.Replace(f.String(), "UserIDStatsResponse", "UserIDStatsResponse", 1) + "," - } - repeatedStringForStats += "}" s := strings.Join([]string{`&UsersStatsResponse{`, - `Stats:` + repeatedStringForStats + `,`, + `Stats:` + strings.Replace(fmt.Sprintf("%v", this.Stats), "UserIDStatsResponse", "UserIDStatsResponse", 1) + `,`, `}`, }, "") return s @@ -4358,15 +4328,10 @@ func (this *MetricsForLabelMatchersRequest) String() string { if this == nil { return "nil" } - repeatedStringForMatchersSet := "[]*LabelMatchers{" - for _, f := range this.MatchersSet { - repeatedStringForMatchersSet += strings.Replace(f.String(), "LabelMatchers", "LabelMatchers", 1) + "," - } - repeatedStringForMatchersSet += "}" s := strings.Join([]string{`&MetricsForLabelMatchersRequest{`, `StartTimestampMs:` + fmt.Sprintf("%v", this.StartTimestampMs) + `,`, `EndTimestampMs:` + fmt.Sprintf("%v", this.EndTimestampMs) + `,`, - `MatchersSet:` + repeatedStringForMatchersSet + `,`, + `MatchersSet:` + strings.Replace(fmt.Sprintf("%v", this.MatchersSet), "LabelMatchers", "LabelMatchers", 1) + `,`, `}`, }, "") return s @@ -4375,13 +4340,8 @@ func (this *MetricsForLabelMatchersResponse) String() string { if this == nil { return "nil" } - repeatedStringForMetric := "[]*Metric{" - for _, f := range this.Metric { - repeatedStringForMetric += strings.Replace(f.String(), "Metric", "Metric", 1) + "," - } - repeatedStringForMetric += "}" s := strings.Join([]string{`&MetricsForLabelMatchersResponse{`, - `Metric:` + repeatedStringForMetric + `,`, + `Metric:` + strings.Replace(fmt.Sprintf("%v", this.Metric), "Metric", "Metric", 1) + `,`, `}`, }, "") return s @@ -4390,16 +4350,11 @@ func (this *TimeSeriesChunk) String() string { if this == nil { return "nil" } - repeatedStringForChunks := "[]Chunk{" - for _, f := range this.Chunks { - repeatedStringForChunks += strings.Replace(strings.Replace(f.String(), "Chunk", "Chunk", 1), `&`, ``, 1) + "," - } - repeatedStringForChunks += "}" s := strings.Join([]string{`&TimeSeriesChunk{`, `FromIngesterId:` + fmt.Sprintf("%v", this.FromIngesterId) + `,`, `UserId:` + fmt.Sprintf("%v", this.UserId) + `,`, `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, - `Chunks:` + repeatedStringForChunks + `,`, + `Chunks:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Chunks), "Chunk", "Chunk", 1), `&`, ``, 1) + `,`, `}`, }, "") return s @@ -4430,14 +4385,9 @@ func (this *TimeSeries) String() string { if this == nil { return "nil" } - repeatedStringForSamples := "[]Sample{" - for _, f := range this.Samples { - repeatedStringForSamples += strings.Replace(strings.Replace(f.String(), "Sample", "Sample", 1), `&`, ``, 1) + "," - } - repeatedStringForSamples += "}" s := strings.Join([]string{`&TimeSeries{`, `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, - `Samples:` + repeatedStringForSamples + `,`, + `Samples:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Samples), "Sample", "Sample", 1), `&`, ``, 1) + `,`, `}`, }, "") return s @@ -4468,13 +4418,8 @@ func (this *LabelMatchers) String() string { if this == nil { return "nil" } - repeatedStringForMatchers := "[]*LabelMatcher{" - for _, f := range this.Matchers { - repeatedStringForMatchers += strings.Replace(f.String(), "LabelMatcher", "LabelMatcher", 1) + "," - } - repeatedStringForMatchers += "}" s := strings.Join([]string{`&LabelMatchers{`, - `Matchers:` + repeatedStringForMatchers + `,`, + `Matchers:` + strings.Replace(fmt.Sprintf("%v", this.Matchers), "LabelMatcher", "LabelMatcher", 1) + `,`, `}`, }, "") return s diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/model.go b/vendor/github.com/cortexproject/cortex/pkg/ring/model.go index 5644acaf51d7..82d0de2f319e 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/model.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/model.go @@ -143,7 +143,7 @@ func (d *Desc) Ready(heartbeatTimeout time.Duration) error { // TokensFor partitions the tokens into those for the given ID, and those for others. func (d *Desc) TokensFor(id string) (tokens, other []uint32) { var takenTokens, myTokens []uint32 - for _, token := range d.Tokens { + for _, token := range migrateRing(d) { takenTokens = append(takenTokens, token.Token) if token.Ingester == id { myTokens = append(myTokens, token.Token) diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go b/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go index 61c1aa312876..46bdfc01d823 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go @@ -150,7 +150,7 @@ func (r *Ring) loop(ctx context.Context) { } ringDesc := value.(*Desc) - ringDesc = r.migrateRing(ringDesc) + ringDesc.Tokens = migrateRing(ringDesc) r.mtx.Lock() defer r.mtx.Unlock() r.ringDesc = ringDesc @@ -159,7 +159,7 @@ func (r *Ring) loop(ctx context.Context) { } // migrateRing will denormalise the ring's tokens if stored in normal form. -func (r *Ring) migrateRing(desc *Desc) *Desc { +func migrateRing(desc *Desc) []TokenDesc { numTokens := len(desc.Tokens) for _, ing := range desc.Ingesters { numTokens += len(ing.Tokens) @@ -175,8 +175,7 @@ func (r *Ring) migrateRing(desc *Desc) *Desc { } } sort.Sort(ByToken(tokens)) - desc.Tokens = tokens - return desc + return tokens } // Get returns n (or more) ingesters which form the replicas for the given key. diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/ring.pb.go b/vendor/github.com/cortexproject/cortex/pkg/ring/ring.pb.go index ea26b706badf..3022e80c99ca 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/ring.pb.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/ring.pb.go @@ -469,9 +469,9 @@ func (m *Desc) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintRing(dAtA, i, uint64((&v).Size())) - n1, err1 := (&v).MarshalTo(dAtA[i:]) - if err1 != nil { - return 0, err1 + n1, err := (&v).MarshalTo(dAtA[i:]) + if err != nil { + return 0, err } i += n1 } @@ -663,11 +663,6 @@ func (this *Desc) String() string { if this == nil { return "nil" } - repeatedStringForTokens := "[]TokenDesc{" - for _, f := range this.Tokens { - repeatedStringForTokens += strings.Replace(strings.Replace(f.String(), "TokenDesc", "TokenDesc", 1), `&`, ``, 1) + "," - } - repeatedStringForTokens += "}" keysForIngesters := make([]string, 0, len(this.Ingesters)) for k, _ := range this.Ingesters { keysForIngesters = append(keysForIngesters, k) @@ -680,7 +675,7 @@ func (this *Desc) String() string { mapStringForIngesters += "}" s := strings.Join([]string{`&Desc{`, `Ingesters:` + mapStringForIngesters + `,`, - `Tokens:` + repeatedStringForTokens + `,`, + `Tokens:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Tokens), "TokenDesc", "TokenDesc", 1), `&`, ``, 1) + `,`, `}`, }, "") return s