diff --git a/go.mod b/go.mod index 67df5d734264..42afbd9f3ce3 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/bmatcuk/doublestar v1.1.1 github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e - github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59 + github.com/cortexproject/cortex v0.3.1-0.20191025190927-77a09cc7c953 github.com/davecgh/go-spew v1.1.1 github.com/docker/distribution v2.7.1+incompatible // indirect github.com/docker/docker v0.0.0-20190607191414-238f8eaa31aa diff --git a/go.sum b/go.sum index 09b27364643f..09dee2e3f746 100644 --- a/go.sum +++ b/go.sum @@ -96,8 +96,8 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59 h1:DPfMYL5cV21JIaFtf64szezjkopANcwiQmeMZVCbStg= -github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59/go.mod h1:XLeoQLsfseLmVzRpZ6MIuoUOTAC979R7WSdBdnwe800= +github.com/cortexproject/cortex v0.3.1-0.20191025190927-77a09cc7c953 h1:V6cRjz6Kx4lmv5xkWdgNzhDwWxFU4nl9ttSX+9YhJJE= +github.com/cortexproject/cortex v0.3.1-0.20191025190927-77a09cc7c953/go.mod h1:XLeoQLsfseLmVzRpZ6MIuoUOTAC979R7WSdBdnwe800= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8= github.com/cznic/fileutil v0.0.0-20180108211300-6a051e75936f/go.mod h1:8S58EK26zhXSxzv7NQFpnliaOQsmDUxvoQO3rt154Vg= diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 7d3adeca2c2b..35b0ecf8fc8d 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -1,6 +1,7 @@ package distributor import ( + "math" "net/http" "github.com/weaveworks/common/httpgrpc" @@ -37,7 +38,7 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { } default: - if _, err := util.ParseProtoReader(r.Context(), r.Body, &req, util.RawSnappy); err != nil { + if _, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } diff --git a/pkg/promtail/client/client_test.go b/pkg/promtail/client/client_test.go index f03e06fd7c13..84a935dd87c1 100644 --- a/pkg/promtail/client/client_test.go +++ b/pkg/promtail/client/client_test.go @@ -1,6 +1,7 @@ package client import ( + "math" "net/http" "net/http/httptest" "strings" @@ -243,7 +244,7 @@ func createServerHandler(receivedReqsChan chan logproto.PushRequest, status int) return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { // Parse the request var pushReq logproto.PushRequest - if _, err := util.ParseProtoReader(req.Context(), req.Body, &pushReq, util.RawSnappy); err != nil { + if _, err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil { rw.WriteHeader(500) return } diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index 08b7b7e368d2..5fa0ffc19767 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "math/rand" "net/http" "os" @@ -435,7 +436,7 @@ type testServerHandler struct { func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var req logproto.PushRequest - if _, err := util.ParseProtoReader(r.Context(), r.Body, &req, util.RawSnappy); err != nil { + if _, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go index 667b51110aa5..58c000df8325 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go @@ -151,11 +151,18 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc KeyType: aws.String(dynamodb.KeyTypeRange), }, }, - ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + } + + if desc.UseOnDemandIOMode { + input.BillingMode = aws.String(dynamodb.BillingModePayPerRequest) + } else { + input.BillingMode = aws.String(dynamodb.BillingModeProvisioned) + input.ProvisionedThroughput = &dynamodb.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(desc.ProvisionedRead), WriteCapacityUnits: aws.Int64(desc.ProvisionedWrite), - }, + } } + output, err := d.DynamoDB.CreateTableWithContext(ctx, input) if err != nil { return err diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/redis_cache.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/redis_cache.go index 878033b89a94..43e14dba3fc8 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/redis_cache.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/redis_cache.go @@ -141,14 +141,3 @@ func (c *RedisCache) ping(ctx context.Context) error { } return err } - -func redisStatusCode(err error) string { - switch err { - case nil: - return "200" - case redis.ErrNil: - return "404" - default: - return "500" - } -} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go index 4a87f49022b0..1b139bd719fe 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go @@ -14,10 +14,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/util" ) -const ( - maxRowReads = 100 -) - // Config for a StorageClient type Config struct { Addresses string `yaml:"addresses,omitempty"` diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go index 9a5ef76ea374..1d83a0600b0b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go @@ -42,11 +42,9 @@ func labelNamesFromChunks(chunks []Chunk) []string { var result []string for _, c := range chunks { for _, l := range c.Metric { - if l.Name != model.MetricNameLabel { - if _, ok := keys[string(l.Name)]; !ok { - keys[string(l.Name)] = struct{}{} - result = append(result, string(l.Name)) - } + if _, ok := keys[string(l.Name)]; !ok { + keys[string(l.Name)] = struct{}{} + result = append(result, string(l.Name)) } } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go index f22aad0705d5..3680dc67efdf 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go @@ -55,7 +55,7 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index var store Store var err error switch cfg.Schema { - case "v9", "v10": + case "v9", "v10", "v11": store, err = newSeriesStore(storeCfg, schema, index, chunks, limits) default: store, err = newStore(storeCfg, schema, index, chunks, limits) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go index 66dff5b1cdee..8683ebc5a00b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go @@ -32,10 +32,10 @@ func newBigchunk() *bigchunk { return &bigchunk{} } -func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { +func (b *bigchunk) Add(sample model.SamplePair) (Chunk, error) { if b.remainingSamples == 0 { if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes { - return addToOverflowChunk(b, sample) + return addToOverflowChunk(sample) } if err := b.addNextChunk(sample.Timestamp); err != nil { return nil, err @@ -44,7 +44,7 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { b.appender.Append(int64(sample.Timestamp), float64(sample.Value)) b.remainingSamples-- - return []Chunk{b}, nil + return nil, nil } // addNextChunk adds a new XOR "subchunk" to the internal list of chunks. diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/chunk.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/chunk.go index f36e4d3597f2..b31304714d1b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/chunk.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/chunk.go @@ -31,19 +31,16 @@ const ChunkLen = 1024 var ( errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") - errAddedToEvictedChunk = errors.New("attempted to add sample to evicted chunk") ) // Chunk is the interface for all chunks. Chunks are generally not // goroutine-safe. type Chunk interface { // Add adds a SamplePair to the chunks, performs any necessary - // re-encoding, and adds any necessary overflow chunks. It returns the - // new version of the original chunk, followed by overflow chunks, if - // any. The first chunk returned might be the same as the original one - // or a newly allocated version. In any case, take the returned chunk as - // the relevant one and discard the original chunk. - Add(sample model.SamplePair) ([]Chunk, error) + // re-encoding, and creates any necessary overflow chunk. + // The returned Chunk is the overflow chunk if it was created. + // The returned Chunk is nil if the sample got appended to the same chunk. + Add(sample model.SamplePair) (Chunk, error) // NewIterator returns an iterator for the chunks. // The iterator passed as argument is for re-use. Depending on implementation, // the iterator can be re-used or a new iterator can be allocated. @@ -123,12 +120,13 @@ func RangeValues(it Iterator, in metric.Interval) ([]model.SamplePair, error) { // addToOverflowChunk is a utility function that creates a new chunk as overflow // chunk, adds the provided sample to it, and returns a chunk slice containing // the provided old chunk followed by the new overflow chunk. -func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) { - overflowChunks, err := New().Add(s) +func addToOverflowChunk(s model.SamplePair) (Chunk, error) { + overflowChunk := New() + _, err := overflowChunk.Add(s) if err != nil { return nil, err } - return []Chunk{c, overflowChunks[0]}, nil + return overflowChunk, nil } // transcodeAndAdd is a utility function that transcodes the dst chunk into the @@ -139,27 +137,33 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) Ops.WithLabelValues(Transcode).Inc() var ( - head = dst - body, NewChunks []Chunk - err error + head = dst + newChunk Chunk + body = []Chunk{head} + err error ) it := src.NewIterator(nil) for it.Scan() { - if NewChunks, err = head.Add(it.Value()); err != nil { + if newChunk, err = head.Add(it.Value()); err != nil { return nil, err } - body = append(body, NewChunks[:len(NewChunks)-1]...) - head = NewChunks[len(NewChunks)-1] + if newChunk != nil { + body = append(body, newChunk) + head = newChunk + } } if it.Err() != nil { return nil, it.Err() } - if NewChunks, err = head.Add(s); err != nil { + if newChunk, err = head.Add(s); err != nil { return nil, err } - return append(body, NewChunks...), nil + if newChunk != nil { + body = append(body, newChunk) + } + return body, nil } // indexAccessor allows accesses to samples by index. diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/delta.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/delta.go deleted file mode 100644 index 120f734c363e..000000000000 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/delta.go +++ /dev/null @@ -1,355 +0,0 @@ -// This file was taken from Prometheus (https://github.com/prometheus/prometheus). -// The original license header is included below: -// -// Copyright 2014 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package encoding - -import ( - "encoding/binary" - "fmt" - "io" - "math" - - "github.com/prometheus/common/model" -) - -// The 21-byte header of a delta-encoded chunk looks like: -// -// - time delta bytes: 1 bytes -// - value delta bytes: 1 bytes -// - is integer: 1 byte -// - base time: 8 bytes -// - base value: 8 bytes -// - used buf bytes: 2 bytes -const ( - deltaHeaderBytes = 21 - - deltaHeaderTimeBytesOffset = 0 - deltaHeaderValueBytesOffset = 1 - deltaHeaderIsIntOffset = 2 - deltaHeaderBaseTimeOffset = 3 - deltaHeaderBaseValueOffset = 11 - deltaHeaderBufLenOffset = 19 -) - -// A deltaEncodedChunk adaptively stores sample timestamps and values with a -// delta encoding of various types (int, float) and bit widths. However, once 8 -// bytes would be needed to encode a delta value, a fall-back to the absolute -// numbers happens (so that timestamps are saved directly as int64 and values as -// float64). It implements the chunk interface. -type deltaEncodedChunk []byte - -// newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk. -func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncodedChunk { - if tb < 1 { - panic("need at least 1 time delta byte") - } - if length < deltaHeaderBytes+16 { - panic(fmt.Errorf( - "chunk length %d bytes is insufficient, need at least %d", - length, deltaHeaderBytes+16, - )) - } - c := make(deltaEncodedChunk, deltaHeaderIsIntOffset+1, length) - - c[deltaHeaderTimeBytesOffset] = byte(tb) - c[deltaHeaderValueBytesOffset] = byte(vb) - if vb < d8 && isInt { // Only use int for fewer than 8 value delta bytes. - c[deltaHeaderIsIntOffset] = 1 - } else { - c[deltaHeaderIsIntOffset] = 0 - } - - return &c -} - -// Add implements chunk. -func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { - // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. - if c.Len() == 0 { - c = c[:deltaHeaderBytes] - binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) - binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value))) - } - - remainingBytes := cap(c) - len(c) - sampleSize := c.sampleSize() - - // Do we generally have space for another sample in this chunk? If not, - // overflow into a new one. - if remainingBytes < sampleSize { - return addToOverflowChunk(&c, s) - } - - baseValue := c.baseValue() - dt := s.Timestamp - c.baseTime() - if dt < 0 { - return nil, fmt.Errorf("time delta is less than zero: %v", dt) - } - - dv := s.Value - baseValue - tb := c.timeBytes() - vb := c.valueBytes() - isInt := c.isInt() - - // If the new sample is incompatible with the current encoding, reencode the - // existing chunk data into new chunk(s). - - ntb, nvb, nInt := tb, vb, isInt - if isInt && !isInt64(dv) { - // int->float. - nvb = d4 - nInt = false - } else if !isInt && vb == d4 && baseValue+model.SampleValue(float32(dv)) != s.Value { - // float32->float64. - nvb = d8 - } else { - if tb < d8 { - // Maybe more bytes for timestamp. - ntb = max(tb, bytesNeededForUnsignedTimestampDelta(dt)) - } - if c.isInt() && vb < d8 { - // Maybe more bytes for sample value. - nvb = max(vb, bytesNeededForIntegerSampleValueDelta(dv)) - } - } - if tb != ntb || vb != nvb || isInt != nInt { - if len(c)*2 < cap(c) { - return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) - } - // Chunk is already half full. Better create a new one and save the transcoding efforts. - return addToOverflowChunk(&c, s) - } - - offset := len(c) - c = c[:offset+sampleSize] - - switch tb { - case d1: - c[offset] = byte(dt) - case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(dt)) - case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(dt)) - case d8: - // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) - default: - return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) - } - - offset += int(tb) - - if c.isInt() { - switch vb { - case d0: - // No-op. Constant value is stored as base value. - case d1: - c[offset] = byte(int8(dv)) - case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(int16(dv))) - case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv))) - // d8 must not happen. Those samples are encoded as float64. - default: - return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) - } - } else { - switch vb { - case d4: - binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(dv))) - case d8: - // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) - default: - return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) - } - } - return []Chunk{&c}, nil -} - -func (c *deltaEncodedChunk) Slice(_, _ model.Time) Chunk { - return c -} - -// NewIterator implements chunk. -func (c *deltaEncodedChunk) NewIterator(_ Iterator) Iterator { - return newIndexAccessingChunkIterator(c.Len(), &deltaEncodedIndexAccessor{ - c: *c, - baseT: c.baseTime(), - baseV: c.baseValue(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), - }) -} - -// Marshal implements chunk. -func (c deltaEncodedChunk) Marshal(w io.Writer) error { - if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint.") - } - binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) - - n, err := w.Write(c[:cap(c)]) - if err != nil { - return err - } - if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n) - } - return nil -} - -// UnmarshalFromBuf implements chunk. -func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { - *c = (*c)[:cap(*c)] - copy(*c, buf) - return c.setLen() -} - -// setLen sets the length of the underlying slice and performs some sanity checks. -func (c *deltaEncodedChunk) setLen() error { - l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) - if int(l) > cap(*c) { - return fmt.Errorf("delta chunk length exceeded during unmarshalling: %d", l) - } - if int(l) < deltaHeaderBytes { - return fmt.Errorf("delta chunk length less than header size: %d < %d", l, deltaHeaderBytes) - } - switch c.timeBytes() { - case d1, d2, d4, d8: - // Pass. - default: - return fmt.Errorf("invalid number of time bytes in delta chunk: %d", c.timeBytes()) - } - switch c.valueBytes() { - case d0, d1, d2, d4, d8: - // Pass. - default: - return fmt.Errorf("invalid number of value bytes in delta chunk: %d", c.valueBytes()) - } - *c = (*c)[:l] - return nil -} - -// Encoding implements chunk. -func (c deltaEncodedChunk) Encoding() Encoding { return Delta } - -// Utilization implements chunk. -func (c deltaEncodedChunk) Utilization() float64 { - return float64(len(c)) / float64(cap(c)) -} - -func (c deltaEncodedChunk) timeBytes() deltaBytes { - return deltaBytes(c[deltaHeaderTimeBytesOffset]) -} - -func (c deltaEncodedChunk) valueBytes() deltaBytes { - return deltaBytes(c[deltaHeaderValueBytesOffset]) -} - -func (c deltaEncodedChunk) isInt() bool { - return c[deltaHeaderIsIntOffset] == 1 -} - -func (c deltaEncodedChunk) baseTime() model.Time { - return model.Time(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:])) -} - -func (c deltaEncodedChunk) baseValue() model.SampleValue { - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:]))) -} - -func (c deltaEncodedChunk) sampleSize() int { - return int(c.timeBytes() + c.valueBytes()) -} - -// Len implements Chunk. Runs in constant time. -func (c deltaEncodedChunk) Len() int { - if len(c) < deltaHeaderBytes { - return 0 - } - return (len(c) - deltaHeaderBytes) / c.sampleSize() -} - -func (c deltaEncodedChunk) Size() int { - return len(c) -} - -// deltaEncodedIndexAccessor implements indexAccessor. -type deltaEncodedIndexAccessor struct { - c deltaEncodedChunk - baseT model.Time - baseV model.SampleValue - tBytes, vBytes deltaBytes - isInt bool - lastErr error -} - -func (acc *deltaEncodedIndexAccessor) err() error { - return acc.lastErr -} - -func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { - offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) - - switch acc.tBytes { - case d1: - return acc.baseT + model.Time(uint8(acc.c[offset])) - case d2: - return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:])) - case d4: - return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:])) - case d8: - // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) - default: - acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) - return model.Earliest - } -} - -func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { - offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - - if acc.isInt { - switch acc.vBytes { - case d0: - return acc.baseV - case d1: - return acc.baseV + model.SampleValue(int8(acc.c[offset])) - case d2: - return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) - case d4: - return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) - // No d8 for ints. - default: - acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) - return 0 - } - } else { - switch acc.vBytes { - case d4: - return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) - case d8: - // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) - default: - acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) - return 0 - } - } -} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go index b9ca6f0402b8..683ce844eef6 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go @@ -84,26 +84,28 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub } // Add implements chunk. -func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { +func (c *doubleDeltaEncodedChunk) Add(s model.SamplePair) (Chunk, error) { // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. if c.Len() == 0 { - return c.addFirstSample(s), nil + c.addFirstSample(s) + return nil, nil } tb := c.timeBytes() vb := c.valueBytes() if c.Len() == 1 { - return c.addSecondSample(s, tb, vb) + err := c.addSecondSample(s, tb, vb) + return nil, err } - remainingBytes := cap(c) - len(c) + remainingBytes := cap(*c) - len(*c) sampleSize := c.sampleSize() // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - return addToOverflowChunk(&c, s) + return addToOverflowChunk(s) } projectedTime := c.baseTime() + model.Time(c.Len())*c.baseTimeDelta() @@ -133,26 +135,47 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { } } if tb != ntb || vb != nvb || c.isInt() != nInt { - if len(c)*2 < cap(c) { - return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) + if len(*c)*2 < cap(*c) { + result, err := transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(*c)), c, s) + if err != nil { + return nil, err + } + // We cannot handle >2 chunks returned as we can only return 1 chunk. + // Ideally there wont be >2 chunks, but if it happens to be >2, + // we fall through to perfom `addToOverflowChunk` instead. + if len(result) == 1 { + // Replace the current chunk with the new bigger chunk. + c0 := result[0].(*doubleDeltaEncodedChunk) + *c = *c0 + return nil, nil + } else if len(result) == 2 { + // Replace the current chunk with the new bigger chunk + // and return the additional chunk. + c0 := result[0].(*doubleDeltaEncodedChunk) + c1 := result[1].(*doubleDeltaEncodedChunk) + *c = *c0 + return c1, nil + } } + // Chunk is already half full. Better create a new one and save the transcoding efforts. - return addToOverflowChunk(&c, s) + // We also perform this if `transcodeAndAdd` resulted in >2 chunks. + return addToOverflowChunk(s) } - offset := len(c) - c = c[:offset+sampleSize] + offset := len(*c) + (*c) = (*c)[:offset+sampleSize] switch tb { case d1: - c[offset] = byte(ddt) + (*c)[offset] = byte(ddt) case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(ddt)) + binary.LittleEndian.PutUint16((*c)[offset:], uint16(ddt)) case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(ddt)) + binary.LittleEndian.PutUint32((*c)[offset:], uint32(ddt)) case d8: // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) + binary.LittleEndian.PutUint64((*c)[offset:], uint64(s.Timestamp)) default: return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) } @@ -164,11 +187,11 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { case d0: // No-op. Constant delta is stored as base value. case d1: - c[offset] = byte(int8(ddv)) + (*c)[offset] = byte(int8(ddv)) case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(int16(ddv))) + binary.LittleEndian.PutUint16((*c)[offset:], uint16(int16(ddv))) case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv))) + binary.LittleEndian.PutUint32((*c)[offset:], uint32(int32(ddv))) // d8 must not happen. Those samples are encoded as float64. default: return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) @@ -176,15 +199,15 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { } else { switch vb { case d4: - binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv))) + binary.LittleEndian.PutUint32((*c)[offset:], math.Float32bits(float32(ddv))) case d8: // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) + binary.LittleEndian.PutUint64((*c)[offset:], math.Float64bits(float64(s.Value))) default: return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []Chunk{&c}, nil + return nil, nil } // FirstTime implements chunk. @@ -243,15 +266,15 @@ func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error { // UnmarshalFromBuf implements chunk. func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { - *c = (*c)[:cap(*c)] - copy(*c, buf) + (*c) = (*c)[:cap((*c))] + copy((*c), buf) return c.setLen() } // setLen sets the length of the underlying slice and performs some sanity checks. func (c *doubleDeltaEncodedChunk) setLen() error { l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) - if int(l) > cap(*c) { + if int(l) > cap((*c)) { return fmt.Errorf("doubledelta chunk length exceeded during unmarshalling: %d", l) } if int(l) < doubleDeltaHeaderMinBytes { @@ -269,7 +292,7 @@ func (c *doubleDeltaEncodedChunk) setLen() error { default: return fmt.Errorf("invalid number of value bytes in doubledelta chunk: %d", c.valueBytes()) } - *c = (*c)[:l] + (*c) = (*c)[:l] return nil } @@ -356,40 +379,39 @@ func (c doubleDeltaEncodedChunk) isInt() bool { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []Chunk { - c = c[:doubleDeltaHeaderBaseValueOffset+8] +func (c *doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) { + (*c) = (*c)[:doubleDeltaHeaderBaseValueOffset+8] binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeOffset:], + (*c)[doubleDeltaHeaderBaseTimeOffset:], uint64(s.Timestamp), ) binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueOffset:], + (*c)[doubleDeltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value)), ) - return []Chunk{&c} } // addSecondSample is a helper method only used by c.add(). It calculates the // base delta from the provided sample and adds it to the chunk. -func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]Chunk, error) { +func (c *doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) error { baseTimeDelta := s.Timestamp - c.baseTime() if baseTimeDelta < 0 { - return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) + return fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) } - c = c[:doubleDeltaHeaderBytes] + (*c) = (*c)[:doubleDeltaHeaderBytes] if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { // If already the base delta needs d8 (or we are at d8 // already, anyway), we better encode this timestamp // directly rather than as a delta and switch everything // to d8. - c[doubleDeltaHeaderTimeBytesOffset] = byte(d8) + (*c)[doubleDeltaHeaderTimeBytesOffset] = byte(d8) binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeDeltaOffset:], + (*c)[doubleDeltaHeaderBaseTimeDeltaOffset:], uint64(s.Timestamp), ) } else { binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeDeltaOffset:], + (*c)[doubleDeltaHeaderBaseTimeDeltaOffset:], uint64(baseTimeDelta), ) } @@ -400,19 +422,19 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt // if we are at d8 already, anyway), we better encode // this value directly rather than as a delta and switch // everything to d8. - c[doubleDeltaHeaderValueBytesOffset] = byte(d8) - c[doubleDeltaHeaderIsIntOffset] = 0 + (*c)[doubleDeltaHeaderValueBytesOffset] = byte(d8) + (*c)[doubleDeltaHeaderIsIntOffset] = 0 binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueDeltaOffset:], + (*c)[doubleDeltaHeaderBaseValueDeltaOffset:], math.Float64bits(float64(s.Value)), ) } else { binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueDeltaOffset:], + (*c)[doubleDeltaHeaderBaseValueDeltaOffset:], math.Float64bits(float64(baseValueDelta)), ) } - return []Chunk{&c}, nil + return nil } // doubleDeltaEncodedIndexAccessor implements indexAccessor. diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go index 83ab9f2602d5..5ac314d9d0fd 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go @@ -1,6 +1,7 @@ package encoding import ( + "errors" "flag" "fmt" "strconv" @@ -26,6 +27,15 @@ func (Config) RegisterFlags(f *flag.FlagSet) { flag.IntVar(&bigchunkSizeCapBytes, "store.bigchunk-size-cap-bytes", bigchunkSizeCapBytes, "When using bigchunk encoding, start a new bigchunk if over this size (0 = unlimited)") } +// Validate errors out if the encoding is set to Delta. +func (Config) Validate() error { + if DefaultEncoding == Delta { + // Delta is deprecated. + return errors.New("delta encoding is deprecated") + } + return nil +} + // String implements flag.Value. func (e Encoding) String() string { if known, found := encodings[e]; found { @@ -35,7 +45,8 @@ func (e Encoding) String() string { } const ( - // Delta encoding + // Delta encoding is no longer supported and will be automatically changed to DoubleDelta. + // It still exists here to not change the `ingester.chunk-encoding` flag values. Delta Encoding = iota // DoubleDelta encoding DoubleDelta @@ -51,12 +62,6 @@ type encoding struct { } var encodings = map[Encoding]encoding{ - Delta: { - Name: "Delta", - New: func() Chunk { - return newDeltaEncodedChunk(d1, d0, true, ChunkLen) - }, - }, DoubleDelta: { Name: "DoubleDelta", New: func() Chunk { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go index c2663fe9e247..2df8abc48271 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go @@ -260,17 +260,20 @@ func newVarbitChunk(enc varbitValueEncoding) *varbitChunk { } // Add implements chunk. -func (c *varbitChunk) Add(s model.SamplePair) ([]Chunk, error) { +func (c *varbitChunk) Add(s model.SamplePair) (Chunk, error) { offset := c.nextSampleOffset() switch { case c.closed(): - return addToOverflowChunk(c, s) + return addToOverflowChunk(s) case offset > varbitNextSampleBitOffsetThreshold: - return c.addLastSample(s), nil + c.addLastSample(s) + return nil, nil case offset == varbitFirstSampleBitOffset: - return c.addFirstSample(s), nil + c.addFirstSample(s) + return nil, nil case offset == varbitSecondSampleBitOffset: - return c.addSecondSample(s) + err := c.addSecondSample(s) + return nil, err } return c.addLaterSample(s, offset) } @@ -492,7 +495,7 @@ func (c varbitChunk) setLastSample(s model.SamplePair) { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk { +func (c *varbitChunk) addFirstSample(s model.SamplePair) { binary.BigEndian.PutUint64( (*c)[varbitFirstTimeOffset:], uint64(s.Timestamp), @@ -503,21 +506,21 @@ func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk { ) c.setLastSample(s) // To simplify handling of single-sample chunks. c.setNextSampleOffset(varbitSecondSampleBitOffset) - return []Chunk{c} } // addSecondSample is a helper method only used by c.add(). It calculates the // first time delta from the provided sample and adds it to the chunk together // with the provided sample as the last sample. -func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { +func (c *varbitChunk) addSecondSample(s model.SamplePair) error { firstTimeDelta := s.Timestamp - c.firstTime() if firstTimeDelta < 0 { - return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) + return fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) } if firstTimeDelta > varbitMaxTimeDelta { // A time delta too great. Still, we can add it as a last sample // before overflowing. - return c.addLastSample(s), nil + c.addLastSample(s) + return nil } (*c)[varbitFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16) (*c)[varbitFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8) @@ -529,7 +532,7 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { c.setLastSample(s) c.setNextSampleOffset(varbitThirdSampleBitOffset) - return []Chunk{c}, nil + return nil } // addLastSample is a helper method only used by c.add() and in other helper @@ -538,15 +541,15 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { // adds the very last sample added to this chunk ever, while setLastSample sets // the sample most recently added to the chunk so that it can be used for the // calculations required to add the next sample. -func (c *varbitChunk) addLastSample(s model.SamplePair) []Chunk { +func (c *varbitChunk) addLastSample(s model.SamplePair) { c.setLastSample(s) (*c)[varbitFlagOffset] |= 0x80 - return []Chunk{c} + return } // addLaterSample is a helper method only used by c.add(). It adds a third or // later sample. -func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk, error) { +func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) (Chunk, error) { var ( lastTime = c.lastTime() lastTimeDelta = c.lastTimeDelta() @@ -564,39 +567,88 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk if newTimeDelta > varbitMaxTimeDelta { // A time delta too great. Still, we can add it as a last sample // before overflowing. - return c.addLastSample(s), nil + c.addLastSample(s) + return nil, nil } // Analyze worst case, does it fit? If not, set new sample as the last. if int(offset)+varbitWorstCaseBitsPerSample[encoding] > ChunkLen*8 { - return c.addLastSample(s), nil + c.addLastSample(s) + return nil, nil } // Transcoding/overflow decisions first. if encoding == varbitZeroEncoding && s.Value != lastValue { // Cannot go on with zero encoding. - if offset > ChunkLen*4 { - // Chunk already half full. Don't transcode, overflow instead. - return addToOverflowChunk(c, s) - } - if isInt32(s.Value - lastValue) { - // Trying int encoding looks promising. - return transcodeAndAdd(newVarbitChunk(varbitIntDoubleDeltaEncoding), c, s) + if offset <= ChunkLen*4 { + var result []Chunk + var err error + if isInt32(s.Value - lastValue) { + // Trying int encoding looks promising. + result, err = transcodeAndAdd(newVarbitChunk(varbitIntDoubleDeltaEncoding), c, s) + } else { + result, err = transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + } + if err != nil { + return nil, err + } + + // We cannot handle >2 chunks returned as we can only return 1 chunk. + // Ideally there wont be >2 chunks, but if it happens to be >2, + // we fall through to perfom `addToOverflowChunk` instead. + if len(result) == 1 { + // Replace the current chunk with the new bigger chunk. + c0 := result[0].(*varbitChunk) + *c = *c0 + return nil, nil + } else if len(result) == 2 { + // Replace the current chunk with the new bigger chunk + // and return the additional chunk. + c0 := result[0].(*varbitChunk) + c1 := result[1].(*varbitChunk) + *c = *c0 + return c1, nil + } } - return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + + // Chunk is already half full. Better create a new one and save the transcoding efforts. + // We also perform this if `transcodeAndAdd` resulted in >2 chunks. + return addToOverflowChunk(s) } if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) { // Cannot go on with int encoding. - if offset > ChunkLen*4 { - // Chunk already half full. Don't transcode, overflow instead. - return addToOverflowChunk(c, s) + if offset <= ChunkLen*4 { + result, err := transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + if err != nil { + return nil, err + } + // We cannot handle >2 chunks returned as we can only return 1 chunk. + // Ideally there wont be >2 chunks, but if it happens to be >2, + // we fall through to perfom `addToOverflowChunk` instead. + if len(result) == 1 { + // Replace the current chunk with the new bigger chunk. + c0 := result[0].(*varbitChunk) + *c = *c0 + return nil, nil + } else if len(result) == 2 { + // Replace the current chunk with the new bigger chunk + // and return the additional chunk. + c0 := result[0].(*varbitChunk) + c1 := result[1].(*varbitChunk) + *c = *c0 + return c1, nil + } } - return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + + // Chunk is already half full. Better create a new one and save the transcoding efforts. + // We also perform this if `transcodeAndAdd` resulted in >2 chunks. + return addToOverflowChunk(s) } offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta) if overflow { - return c.addLastSample(s), nil + c.addLastSample(s) + return nil, nil } switch encoding { case varbitZeroEncoding: @@ -613,7 +665,7 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk c.setNextSampleOffset(offset) c.setLastSample(s) - return []Chunk{c}, nil + return nil, nil } func (c varbitChunk) prepForThirdSample( diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go index 0dfe256d8af2..d18a2515e0c3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go @@ -8,6 +8,7 @@ import ( "flag" "fmt" "strings" + "time" "cloud.google.com/go/bigtable" ot "github.com/opentracing/opentracing-go" @@ -26,7 +27,6 @@ const ( column = "c" separator = "\000" maxRowReads = 100 - null = string('\xff') ) // Config for a StorageClient @@ -38,12 +38,17 @@ type Config struct { ColumnKey bool DistributeKeys bool + + TableCacheEnabled bool + TableCacheExpiration time.Duration } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Project, "bigtable.project", "", "Bigtable project ID.") f.StringVar(&cfg.Instance, "bigtable.instance", "", "Bigtable instance ID.") + f.BoolVar(&cfg.TableCacheEnabled, "bigtable.table-cache.enabled", true, "If enabled, once a tables info is fetched, it is cached.") + f.DurationVar(&cfg.TableCacheExpiration, "bigtable.table-cache.expiration", 30*time.Minute, "Duration to cache tables before checking again.") cfg.GRPCClientConfig.RegisterFlags("bigtable", f) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go index bb6c7e2ca70a..dd315bdc91b3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go @@ -2,6 +2,7 @@ package gcp import ( "context" + "time" "google.golang.org/grpc/codes" @@ -15,6 +16,9 @@ import ( type tableClient struct { cfg Config client *bigtable.AdminClient + + tableInfo map[string]*bigtable.TableInfo + tableExpiration time.Time } // NewTableClient returns a new TableClient. @@ -27,25 +31,37 @@ func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) return &tableClient{ cfg: cfg, client: client, + + tableInfo: map[string]*bigtable.TableInfo{}, }, nil } +// ListTables lists all of the correctly specified cortex tables in bigtable func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { tables, err := c.client.Tables(ctx) if err != nil { return nil, errors.Wrap(err, "client.Tables") } - // Check each table has the right column family. If not, omit it. + if c.tableExpiration.Before(time.Now()) { + c.tableInfo = map[string]*bigtable.TableInfo{} + c.tableExpiration = time.Now().Add(c.cfg.TableCacheExpiration) + } + output := make([]string, 0, len(tables)) for _, table := range tables { - info, err := c.client.TableInfo(ctx, table) - if err != nil { - return nil, errors.Wrap(err, "client.TableInfo") + info, exists := c.tableInfo[table] + if !c.cfg.TableCacheEnabled || !exists { + info, err = c.client.TableInfo(ctx, table) + if err != nil { + return nil, errors.Wrap(err, "client.TableInfo") + } } + // Check each table has the right column family. If not, omit it. if hasColumnFamily(info.FamilyInfos) { output = append(output, table) + c.tableInfo[table] = info } } @@ -86,6 +102,7 @@ func (c *tableClient) DeleteTable(ctx context.Context, name string) error { if err := c.client.DeleteTable(ctx, name); err != nil { return errors.Wrap(err, "client.DeleteTable") } + delete(c.tableInfo, name) return nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go index 56251a19f328..80464e3a7ccd 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go @@ -165,6 +165,7 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { itemComponents := decodeRangeKey(items[i].rangeValue) if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) && !bytes.Equal(itemComponents[3], seriesRangeKeyV1) && + !bytes.Equal(itemComponents[3], labelNamesRangeKeyV1) && !bytes.Equal(itemComponents[3], labelSeriesRangeKeyV1) { return fmt.Errorf("Dupe write") } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go index dcba16752449..9ac30985dbab 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go @@ -22,7 +22,6 @@ var bucketName = []byte("index") const ( separator = "\000" - null = string('\xff') dbReloadPeriod = 10 * time.Minute ) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go index 4e3142c5e501..778bbb3e4c86 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go @@ -7,6 +7,7 @@ import ( "fmt" "strings" + jsoniter "github.com/json-iterator/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -22,6 +23,8 @@ var ( // For v9 schema seriesRangeKeyV1 = []byte{'7'} labelSeriesRangeKeyV1 = []byte{'8'} + // For v11 schema + labelNamesRangeKeyV1 = []byte{'9'} // ErrNotSupported when a schema doesn't support that particular lookup. ErrNotSupported = errors.New("not supported") @@ -45,6 +48,8 @@ type Schema interface { // If the query resulted in series IDs, use this method to find chunks. GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) + // Returns queries to retrieve all label names of multiple series by id. + GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) } // IndexQuery describes a query for entries @@ -78,9 +83,11 @@ type IndexEntry struct { Value []byte } +type schemaBucketsFunc func(from, through model.Time, userID string) []Bucket + // schema implements Schema given a bucketing function and and set of range key callbacks type schema struct { - buckets func(from, through model.Time, userID string) []Bucket + buckets schemaBucketsFunc entries entries } @@ -194,6 +201,20 @@ func (s schema) GetChunksForSeries(from, through model.Time, userID string, seri return result, nil } +func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { + var result []IndexQuery + + buckets := s.buckets(from, through, userID) + for _, bucket := range buckets { + entries, err := s.entries.GetLabelNamesForSeries(bucket, seriesID) + if err != nil { + return nil, err + } + result = append(result, entries...) + } + return result, nil +} + type entries interface { GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) @@ -203,6 +224,7 @@ type entries interface { GetReadMetricLabelQueries(bucket Bucket, metricName string, labelName string) ([]IndexQuery, error) GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) + GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) } // original entries: @@ -274,6 +296,10 @@ func (originalEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, err return nil, ErrNotSupported } +func (originalEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { + return nil, ErrNotSupported +} + // v3Schema went to base64 encoded label values & a version ID // - range key: