diff --git a/cache/cache_test.go b/cache/cache_test.go index 8ef1d4c93319..37664adad06a 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -27,10 +27,13 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { chunks := []chunk.Chunk{} for i := 0; i < 100; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - promChunk, _ := prom_chunk.New().Add(model.SamplePair{ + promChunk := prom_chunk.New() + nc, err := promChunk.Add(model.SamplePair{ Timestamp: ts, Value: model.SampleValue(i), }) + require.NoError(t, err) + require.Nil(t, nc) c := chunk.NewChunk( userID, model.Fingerprint(1), @@ -38,12 +41,12 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { {Name: model.MetricNameLabel, Value: "foo"}, {Name: "bar", Value: "baz"}, }, - promChunk[0], + promChunk, ts, ts.Add(chunkLen), ) - err := c.Encode() + err = c.Encode() require.NoError(t, err) buf, err := c.Encoded() require.NoError(t, err) diff --git a/chunk_store_test.go b/chunk_store_test.go index 80d380d7aaa3..00e943bff2a6 100644 --- a/chunk_store_test.go +++ b/chunk_store_test.go @@ -579,10 +579,13 @@ func TestChunkStoreRandom(t *testing.T) { const chunkLen = 2 * 3600 // in seconds for i := 0; i < 100; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - chunks, _ := encoding.New().Add(model.SamplePair{ + ch := encoding.New() + nc, err := ch.Add(model.SamplePair{ Timestamp: ts, Value: model.SampleValue(float64(i)), }) + require.NoError(t, err) + require.Nil(t, nc) chunk := NewChunk( userID, model.Fingerprint(1), @@ -590,11 +593,11 @@ func TestChunkStoreRandom(t *testing.T) { {Name: labels.MetricName, Value: "foo"}, {Name: "bar", Value: "baz"}, }, - chunks[0], + ch, ts, ts.Add(chunkLen*time.Second).Add(-1*time.Second), ) - err := chunk.Encode() + err = chunk.Encode() require.NoError(t, err) err = store.Put(ctx, []Chunk{chunk}) require.NoError(t, err) @@ -644,10 +647,13 @@ func TestChunkStoreLeastRead(t *testing.T) { const chunkLen = 60 // in seconds for i := 0; i < 24; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - chunks, _ := encoding.New().Add(model.SamplePair{ + ch := encoding.New() + nc, err := ch.Add(model.SamplePair{ Timestamp: ts, Value: model.SampleValue(float64(i)), }) + require.NoError(t, err) + require.Nil(t, nc) chunk := NewChunk( userID, model.Fingerprint(1), @@ -655,12 +661,12 @@ func TestChunkStoreLeastRead(t *testing.T) { {Name: labels.MetricName, Value: "foo"}, {Name: "bar", Value: "baz"}, }, - chunks[0], + ch, ts, ts.Add(chunkLen*time.Second), ) t.Logf("Loop %d", i) - err := chunk.Encode() + err = chunk.Encode() require.NoError(t, err) err = store.Put(ctx, []Chunk{chunk}) require.NoError(t, err) diff --git a/chunk_test.go b/chunk_test.go index 8a10bb72a1d4..c040ba31c3dc 100644 --- a/chunk_test.go +++ b/chunk_test.go @@ -36,11 +36,13 @@ func dummyChunkForEncoding(now model.Time, metric labels.Labels, enc encoding.En c, _ := encoding.NewForEncoding(enc) for i := 0; i < samples; i++ { t := time.Duration(i) * 15 * time.Second - cs, err := c.Add(model.SamplePair{Timestamp: now.Add(t), Value: 0}) + nc, err := c.Add(model.SamplePair{Timestamp: now.Add(t), Value: 0}) if err != nil { panic(err) } - c = cs[0] + if nc != nil { + panic("returned chunk was not nil") + } } chunk := NewChunk( userID, diff --git a/encoding/bigchunk.go b/encoding/bigchunk.go index 66dff5b1cdee..8683ebc5a00b 100644 --- a/encoding/bigchunk.go +++ b/encoding/bigchunk.go @@ -32,10 +32,10 @@ func newBigchunk() *bigchunk { return &bigchunk{} } -func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { +func (b *bigchunk) Add(sample model.SamplePair) (Chunk, error) { if b.remainingSamples == 0 { if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes { - return addToOverflowChunk(b, sample) + return addToOverflowChunk(sample) } if err := b.addNextChunk(sample.Timestamp); err != nil { return nil, err @@ -44,7 +44,7 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { b.appender.Append(int64(sample.Timestamp), float64(sample.Value)) b.remainingSamples-- - return []Chunk{b}, nil + return nil, nil } // addNextChunk adds a new XOR "subchunk" to the internal list of chunks. diff --git a/encoding/bigchunk_test.go b/encoding/bigchunk_test.go index 7e5e6e8761cc..b0c2db12d846 100644 --- a/encoding/bigchunk_test.go +++ b/encoding/bigchunk_test.go @@ -12,12 +12,12 @@ import ( func TestSliceBiggerChunk(t *testing.T) { var c Chunk = newBigchunk() for i := 0; i < 12*3600/15; i++ { - cs, err := c.Add(model.SamplePair{ + nc, err := c.Add(model.SamplePair{ Timestamp: model.Time(i * step), Value: model.SampleValue(i), }) require.NoError(t, err) - c = cs[0] + require.Nil(t, nc) } // Test for when the slice aligns perfectly with the sub-chunk boundaries. @@ -69,12 +69,12 @@ func BenchmarkBiggerChunkMemory(b *testing.B) { for i := 0; i < b.N; i++ { var c Chunk = newBigchunk() for i := 0; i < 12*3600/15; i++ { - cs, err := c.Add(model.SamplePair{ + nc, err := c.Add(model.SamplePair{ Timestamp: model.Time(i * step), Value: model.SampleValue(i), }) require.NoError(b, err) - c = cs[0] + require.Nil(b, nc) } c.(*bigchunk).printSize() diff --git a/encoding/chunk.go b/encoding/chunk.go index f36e4d3597f2..c5963ee5c34f 100644 --- a/encoding/chunk.go +++ b/encoding/chunk.go @@ -38,12 +38,10 @@ var ( // goroutine-safe. type Chunk interface { // Add adds a SamplePair to the chunks, performs any necessary - // re-encoding, and adds any necessary overflow chunks. It returns the - // new version of the original chunk, followed by overflow chunks, if - // any. The first chunk returned might be the same as the original one - // or a newly allocated version. In any case, take the returned chunk as - // the relevant one and discard the original chunk. - Add(sample model.SamplePair) ([]Chunk, error) + // re-encoding, and creates any necessary overflow chunk. + // The returned Chunk is the overflow chunk if it was created. + // The returned Chunk is nil if the sample got appended to the same chunk. + Add(sample model.SamplePair) (Chunk, error) // NewIterator returns an iterator for the chunks. // The iterator passed as argument is for re-use. Depending on implementation, // the iterator can be re-used or a new iterator can be allocated. @@ -123,12 +121,13 @@ func RangeValues(it Iterator, in metric.Interval) ([]model.SamplePair, error) { // addToOverflowChunk is a utility function that creates a new chunk as overflow // chunk, adds the provided sample to it, and returns a chunk slice containing // the provided old chunk followed by the new overflow chunk. -func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) { - overflowChunks, err := New().Add(s) +func addToOverflowChunk(s model.SamplePair) (Chunk, error) { + overflowChunk := New() + _, err := overflowChunk.Add(s) if err != nil { return nil, err } - return []Chunk{c, overflowChunks[0]}, nil + return overflowChunk, nil } // transcodeAndAdd is a utility function that transcodes the dst chunk into the @@ -139,27 +138,33 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) Ops.WithLabelValues(Transcode).Inc() var ( - head = dst - body, NewChunks []Chunk - err error + head = dst + newChunk Chunk + body = []Chunk{head} + err error ) it := src.NewIterator(nil) for it.Scan() { - if NewChunks, err = head.Add(it.Value()); err != nil { + if newChunk, err = head.Add(it.Value()); err != nil { return nil, err } - body = append(body, NewChunks[:len(NewChunks)-1]...) - head = NewChunks[len(NewChunks)-1] + if newChunk != nil { + body = append(body, newChunk) + head = newChunk + } } if it.Err() != nil { return nil, it.Err() } - if NewChunks, err = head.Add(s); err != nil { + if newChunk, err = head.Add(s); err != nil { return nil, err } - return append(body, NewChunks...), nil + if newChunk != nil { + body = append(body, newChunk) + } + return body, nil } // indexAccessor allows accesses to samples by index. diff --git a/encoding/chunk_test.go b/encoding/chunk_test.go index c7bf8a3c0f19..f3038941d8f9 100644 --- a/encoding/chunk_test.go +++ b/encoding/chunk_test.go @@ -29,7 +29,7 @@ import ( func TestLen(t *testing.T) { chunks := []Chunk{} - for _, encoding := range []Encoding{Delta, DoubleDelta, Varbit} { + for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk} { c, err := NewForEncoding(encoding) if err != nil { t.Fatal(err) @@ -43,11 +43,12 @@ func TestLen(t *testing.T) { t.Errorf("chunk type %s should have %d samples, had %d", c.Encoding(), i, c.Len()) } - cs, _ := c.Add(model.SamplePair{ + cs, err := c.Add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i), }) - c = cs[0] + require.NoError(t, err) + require.Nil(t, cs) } } } @@ -95,13 +96,12 @@ func mkChunk(t *testing.T, encoding Encoding, samples int) Chunk { require.NoError(t, err) for i := 0; i < samples; i++ { - chunks, err := chunk.Add(model.SamplePair{ + newChunk, err := chunk.Add(model.SamplePair{ Timestamp: model.Time(i * step), Value: model.SampleValue(i), }) require.NoError(t, err) - require.Len(t, chunks, 1) - chunk = chunks[0] + require.Nil(t, newChunk) } return chunk diff --git a/encoding/delta.go b/encoding/delta.go deleted file mode 100644 index 120f734c363e..000000000000 --- a/encoding/delta.go +++ /dev/null @@ -1,355 +0,0 @@ -// This file was taken from Prometheus (https://github.com/prometheus/prometheus). -// The original license header is included below: -// -// Copyright 2014 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package encoding - -import ( - "encoding/binary" - "fmt" - "io" - "math" - - "github.com/prometheus/common/model" -) - -// The 21-byte header of a delta-encoded chunk looks like: -// -// - time delta bytes: 1 bytes -// - value delta bytes: 1 bytes -// - is integer: 1 byte -// - base time: 8 bytes -// - base value: 8 bytes -// - used buf bytes: 2 bytes -const ( - deltaHeaderBytes = 21 - - deltaHeaderTimeBytesOffset = 0 - deltaHeaderValueBytesOffset = 1 - deltaHeaderIsIntOffset = 2 - deltaHeaderBaseTimeOffset = 3 - deltaHeaderBaseValueOffset = 11 - deltaHeaderBufLenOffset = 19 -) - -// A deltaEncodedChunk adaptively stores sample timestamps and values with a -// delta encoding of various types (int, float) and bit widths. However, once 8 -// bytes would be needed to encode a delta value, a fall-back to the absolute -// numbers happens (so that timestamps are saved directly as int64 and values as -// float64). It implements the chunk interface. -type deltaEncodedChunk []byte - -// newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk. -func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncodedChunk { - if tb < 1 { - panic("need at least 1 time delta byte") - } - if length < deltaHeaderBytes+16 { - panic(fmt.Errorf( - "chunk length %d bytes is insufficient, need at least %d", - length, deltaHeaderBytes+16, - )) - } - c := make(deltaEncodedChunk, deltaHeaderIsIntOffset+1, length) - - c[deltaHeaderTimeBytesOffset] = byte(tb) - c[deltaHeaderValueBytesOffset] = byte(vb) - if vb < d8 && isInt { // Only use int for fewer than 8 value delta bytes. - c[deltaHeaderIsIntOffset] = 1 - } else { - c[deltaHeaderIsIntOffset] = 0 - } - - return &c -} - -// Add implements chunk. -func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { - // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. - if c.Len() == 0 { - c = c[:deltaHeaderBytes] - binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) - binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value))) - } - - remainingBytes := cap(c) - len(c) - sampleSize := c.sampleSize() - - // Do we generally have space for another sample in this chunk? If not, - // overflow into a new one. - if remainingBytes < sampleSize { - return addToOverflowChunk(&c, s) - } - - baseValue := c.baseValue() - dt := s.Timestamp - c.baseTime() - if dt < 0 { - return nil, fmt.Errorf("time delta is less than zero: %v", dt) - } - - dv := s.Value - baseValue - tb := c.timeBytes() - vb := c.valueBytes() - isInt := c.isInt() - - // If the new sample is incompatible with the current encoding, reencode the - // existing chunk data into new chunk(s). - - ntb, nvb, nInt := tb, vb, isInt - if isInt && !isInt64(dv) { - // int->float. - nvb = d4 - nInt = false - } else if !isInt && vb == d4 && baseValue+model.SampleValue(float32(dv)) != s.Value { - // float32->float64. - nvb = d8 - } else { - if tb < d8 { - // Maybe more bytes for timestamp. - ntb = max(tb, bytesNeededForUnsignedTimestampDelta(dt)) - } - if c.isInt() && vb < d8 { - // Maybe more bytes for sample value. - nvb = max(vb, bytesNeededForIntegerSampleValueDelta(dv)) - } - } - if tb != ntb || vb != nvb || isInt != nInt { - if len(c)*2 < cap(c) { - return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) - } - // Chunk is already half full. Better create a new one and save the transcoding efforts. - return addToOverflowChunk(&c, s) - } - - offset := len(c) - c = c[:offset+sampleSize] - - switch tb { - case d1: - c[offset] = byte(dt) - case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(dt)) - case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(dt)) - case d8: - // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) - default: - return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) - } - - offset += int(tb) - - if c.isInt() { - switch vb { - case d0: - // No-op. Constant value is stored as base value. - case d1: - c[offset] = byte(int8(dv)) - case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(int16(dv))) - case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv))) - // d8 must not happen. Those samples are encoded as float64. - default: - return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) - } - } else { - switch vb { - case d4: - binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(dv))) - case d8: - // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) - default: - return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) - } - } - return []Chunk{&c}, nil -} - -func (c *deltaEncodedChunk) Slice(_, _ model.Time) Chunk { - return c -} - -// NewIterator implements chunk. -func (c *deltaEncodedChunk) NewIterator(_ Iterator) Iterator { - return newIndexAccessingChunkIterator(c.Len(), &deltaEncodedIndexAccessor{ - c: *c, - baseT: c.baseTime(), - baseV: c.baseValue(), - tBytes: c.timeBytes(), - vBytes: c.valueBytes(), - isInt: c.isInt(), - }) -} - -// Marshal implements chunk. -func (c deltaEncodedChunk) Marshal(w io.Writer) error { - if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint.") - } - binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) - - n, err := w.Write(c[:cap(c)]) - if err != nil { - return err - } - if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n) - } - return nil -} - -// UnmarshalFromBuf implements chunk. -func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { - *c = (*c)[:cap(*c)] - copy(*c, buf) - return c.setLen() -} - -// setLen sets the length of the underlying slice and performs some sanity checks. -func (c *deltaEncodedChunk) setLen() error { - l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) - if int(l) > cap(*c) { - return fmt.Errorf("delta chunk length exceeded during unmarshalling: %d", l) - } - if int(l) < deltaHeaderBytes { - return fmt.Errorf("delta chunk length less than header size: %d < %d", l, deltaHeaderBytes) - } - switch c.timeBytes() { - case d1, d2, d4, d8: - // Pass. - default: - return fmt.Errorf("invalid number of time bytes in delta chunk: %d", c.timeBytes()) - } - switch c.valueBytes() { - case d0, d1, d2, d4, d8: - // Pass. - default: - return fmt.Errorf("invalid number of value bytes in delta chunk: %d", c.valueBytes()) - } - *c = (*c)[:l] - return nil -} - -// Encoding implements chunk. -func (c deltaEncodedChunk) Encoding() Encoding { return Delta } - -// Utilization implements chunk. -func (c deltaEncodedChunk) Utilization() float64 { - return float64(len(c)) / float64(cap(c)) -} - -func (c deltaEncodedChunk) timeBytes() deltaBytes { - return deltaBytes(c[deltaHeaderTimeBytesOffset]) -} - -func (c deltaEncodedChunk) valueBytes() deltaBytes { - return deltaBytes(c[deltaHeaderValueBytesOffset]) -} - -func (c deltaEncodedChunk) isInt() bool { - return c[deltaHeaderIsIntOffset] == 1 -} - -func (c deltaEncodedChunk) baseTime() model.Time { - return model.Time(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:])) -} - -func (c deltaEncodedChunk) baseValue() model.SampleValue { - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:]))) -} - -func (c deltaEncodedChunk) sampleSize() int { - return int(c.timeBytes() + c.valueBytes()) -} - -// Len implements Chunk. Runs in constant time. -func (c deltaEncodedChunk) Len() int { - if len(c) < deltaHeaderBytes { - return 0 - } - return (len(c) - deltaHeaderBytes) / c.sampleSize() -} - -func (c deltaEncodedChunk) Size() int { - return len(c) -} - -// deltaEncodedIndexAccessor implements indexAccessor. -type deltaEncodedIndexAccessor struct { - c deltaEncodedChunk - baseT model.Time - baseV model.SampleValue - tBytes, vBytes deltaBytes - isInt bool - lastErr error -} - -func (acc *deltaEncodedIndexAccessor) err() error { - return acc.lastErr -} - -func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { - offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) - - switch acc.tBytes { - case d1: - return acc.baseT + model.Time(uint8(acc.c[offset])) - case d2: - return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:])) - case d4: - return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:])) - case d8: - // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) - default: - acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) - return model.Earliest - } -} - -func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { - offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes) - - if acc.isInt { - switch acc.vBytes { - case d0: - return acc.baseV - case d1: - return acc.baseV + model.SampleValue(int8(acc.c[offset])) - case d2: - return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:]))) - case d4: - return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:]))) - // No d8 for ints. - default: - acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) - return 0 - } - } else { - switch acc.vBytes { - case d4: - return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:]))) - case d8: - // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) - default: - acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) - return 0 - } - } -} diff --git a/encoding/delta_test.go b/encoding/delta_test.go deleted file mode 100644 index 3c014c60e53d..000000000000 --- a/encoding/delta_test.go +++ /dev/null @@ -1,113 +0,0 @@ -// This file was taken from Prometheus (https://github.com/prometheus/prometheus). -// The original license header is included below: -// -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Note: this file has tests for code in both delta.go and doubledelta.go -- -// it may make sense to split those out later, but given that the tests are -// near-identical and share a helper, this feels simpler for now. - -package encoding - -import ( - "bytes" - "encoding/binary" - "strings" - "testing" - - "github.com/prometheus/common/model" -) - -func TestUnmarshallingCorruptedDeltaReturnsAnError(t *testing.T) { - - var verifyUnmarshallingError = func( - err error, - chunkTypeName string, - unmarshalMethod string, - expectedStr string, - ) { - - if err == nil { - t.Errorf("Failed to obtain an error when unmarshalling corrupt %s (from %s)", chunkTypeName, unmarshalMethod) - return - } - - if !strings.Contains(err.Error(), expectedStr) { - t.Errorf( - "'%s' not present in error when unmarshalling corrupt %s (from %s): '%s'", - expectedStr, - chunkTypeName, - unmarshalMethod, - err.Error()) - } - } - - cases := []struct { - chunkTypeName string - chunkConstructor func(deltaBytes, deltaBytes, bool, int) Chunk - minHeaderLen int - chunkLenPos int - timeBytesPos int - }{ - { - chunkTypeName: "deltaEncodedChunk", - chunkConstructor: func(a, b deltaBytes, c bool, d int) Chunk { - return newDeltaEncodedChunk(a, b, c, d) - }, - minHeaderLen: deltaHeaderBytes, - chunkLenPos: deltaHeaderBufLenOffset, - timeBytesPos: deltaHeaderTimeBytesOffset, - }, - { - chunkTypeName: "doubleDeltaEncodedChunk", - chunkConstructor: func(a, b deltaBytes, c bool, d int) Chunk { - return newDoubleDeltaEncodedChunk(a, b, c, d) - }, - minHeaderLen: doubleDeltaHeaderMinBytes, - chunkLenPos: doubleDeltaHeaderBufLenOffset, - timeBytesPos: doubleDeltaHeaderTimeBytesOffset, - }, - } - for _, c := range cases { - chunk := c.chunkConstructor(d1, d4, false, ChunkLen) - - cs, err := chunk.Add(model.SamplePair{ - Timestamp: model.Now(), - Value: model.SampleValue(100), - }) - if err != nil { - t.Fatalf("Couldn't add sample to empty %s: %s", c.chunkTypeName, err) - } - - var writer bytes.Buffer - cs[0].Marshal(&writer) - - // Corrupt time byte to 0, which is illegal. - buf := writer.Bytes() - buf[c.timeBytesPos] = 0 - err = cs[0].UnmarshalFromBuf(buf) - verifyUnmarshallingError(err, c.chunkTypeName, "buf", "invalid number of time bytes") - - // Fix the corruption to go on. - buf[c.timeBytesPos] = byte(d1) - - // Corrupt the length to be every possible too-small value - for i := 0; i < c.minHeaderLen; i++ { - binary.LittleEndian.PutUint16(buf[c.chunkLenPos:], uint16(i)) - - err = cs[0].UnmarshalFromBuf(buf) - verifyUnmarshallingError(err, c.chunkTypeName, "buf", "header size") - } - } -} diff --git a/encoding/doubledelta.go b/encoding/doubledelta.go index b9ca6f0402b8..683ce844eef6 100644 --- a/encoding/doubledelta.go +++ b/encoding/doubledelta.go @@ -84,26 +84,28 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub } // Add implements chunk. -func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { +func (c *doubleDeltaEncodedChunk) Add(s model.SamplePair) (Chunk, error) { // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. if c.Len() == 0 { - return c.addFirstSample(s), nil + c.addFirstSample(s) + return nil, nil } tb := c.timeBytes() vb := c.valueBytes() if c.Len() == 1 { - return c.addSecondSample(s, tb, vb) + err := c.addSecondSample(s, tb, vb) + return nil, err } - remainingBytes := cap(c) - len(c) + remainingBytes := cap(*c) - len(*c) sampleSize := c.sampleSize() // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - return addToOverflowChunk(&c, s) + return addToOverflowChunk(s) } projectedTime := c.baseTime() + model.Time(c.Len())*c.baseTimeDelta() @@ -133,26 +135,47 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { } } if tb != ntb || vb != nvb || c.isInt() != nInt { - if len(c)*2 < cap(c) { - return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) + if len(*c)*2 < cap(*c) { + result, err := transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(*c)), c, s) + if err != nil { + return nil, err + } + // We cannot handle >2 chunks returned as we can only return 1 chunk. + // Ideally there wont be >2 chunks, but if it happens to be >2, + // we fall through to perfom `addToOverflowChunk` instead. + if len(result) == 1 { + // Replace the current chunk with the new bigger chunk. + c0 := result[0].(*doubleDeltaEncodedChunk) + *c = *c0 + return nil, nil + } else if len(result) == 2 { + // Replace the current chunk with the new bigger chunk + // and return the additional chunk. + c0 := result[0].(*doubleDeltaEncodedChunk) + c1 := result[1].(*doubleDeltaEncodedChunk) + *c = *c0 + return c1, nil + } } + // Chunk is already half full. Better create a new one and save the transcoding efforts. - return addToOverflowChunk(&c, s) + // We also perform this if `transcodeAndAdd` resulted in >2 chunks. + return addToOverflowChunk(s) } - offset := len(c) - c = c[:offset+sampleSize] + offset := len(*c) + (*c) = (*c)[:offset+sampleSize] switch tb { case d1: - c[offset] = byte(ddt) + (*c)[offset] = byte(ddt) case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(ddt)) + binary.LittleEndian.PutUint16((*c)[offset:], uint16(ddt)) case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(ddt)) + binary.LittleEndian.PutUint32((*c)[offset:], uint32(ddt)) case d8: // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) + binary.LittleEndian.PutUint64((*c)[offset:], uint64(s.Timestamp)) default: return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) } @@ -164,11 +187,11 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { case d0: // No-op. Constant delta is stored as base value. case d1: - c[offset] = byte(int8(ddv)) + (*c)[offset] = byte(int8(ddv)) case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(int16(ddv))) + binary.LittleEndian.PutUint16((*c)[offset:], uint16(int16(ddv))) case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv))) + binary.LittleEndian.PutUint32((*c)[offset:], uint32(int32(ddv))) // d8 must not happen. Those samples are encoded as float64. default: return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) @@ -176,15 +199,15 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { } else { switch vb { case d4: - binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv))) + binary.LittleEndian.PutUint32((*c)[offset:], math.Float32bits(float32(ddv))) case d8: // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) + binary.LittleEndian.PutUint64((*c)[offset:], math.Float64bits(float64(s.Value))) default: return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []Chunk{&c}, nil + return nil, nil } // FirstTime implements chunk. @@ -243,15 +266,15 @@ func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error { // UnmarshalFromBuf implements chunk. func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { - *c = (*c)[:cap(*c)] - copy(*c, buf) + (*c) = (*c)[:cap((*c))] + copy((*c), buf) return c.setLen() } // setLen sets the length of the underlying slice and performs some sanity checks. func (c *doubleDeltaEncodedChunk) setLen() error { l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) - if int(l) > cap(*c) { + if int(l) > cap((*c)) { return fmt.Errorf("doubledelta chunk length exceeded during unmarshalling: %d", l) } if int(l) < doubleDeltaHeaderMinBytes { @@ -269,7 +292,7 @@ func (c *doubleDeltaEncodedChunk) setLen() error { default: return fmt.Errorf("invalid number of value bytes in doubledelta chunk: %d", c.valueBytes()) } - *c = (*c)[:l] + (*c) = (*c)[:l] return nil } @@ -356,40 +379,39 @@ func (c doubleDeltaEncodedChunk) isInt() bool { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []Chunk { - c = c[:doubleDeltaHeaderBaseValueOffset+8] +func (c *doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) { + (*c) = (*c)[:doubleDeltaHeaderBaseValueOffset+8] binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeOffset:], + (*c)[doubleDeltaHeaderBaseTimeOffset:], uint64(s.Timestamp), ) binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueOffset:], + (*c)[doubleDeltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value)), ) - return []Chunk{&c} } // addSecondSample is a helper method only used by c.add(). It calculates the // base delta from the provided sample and adds it to the chunk. -func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]Chunk, error) { +func (c *doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) error { baseTimeDelta := s.Timestamp - c.baseTime() if baseTimeDelta < 0 { - return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) + return fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) } - c = c[:doubleDeltaHeaderBytes] + (*c) = (*c)[:doubleDeltaHeaderBytes] if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { // If already the base delta needs d8 (or we are at d8 // already, anyway), we better encode this timestamp // directly rather than as a delta and switch everything // to d8. - c[doubleDeltaHeaderTimeBytesOffset] = byte(d8) + (*c)[doubleDeltaHeaderTimeBytesOffset] = byte(d8) binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeDeltaOffset:], + (*c)[doubleDeltaHeaderBaseTimeDeltaOffset:], uint64(s.Timestamp), ) } else { binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseTimeDeltaOffset:], + (*c)[doubleDeltaHeaderBaseTimeDeltaOffset:], uint64(baseTimeDelta), ) } @@ -400,19 +422,19 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt // if we are at d8 already, anyway), we better encode // this value directly rather than as a delta and switch // everything to d8. - c[doubleDeltaHeaderValueBytesOffset] = byte(d8) - c[doubleDeltaHeaderIsIntOffset] = 0 + (*c)[doubleDeltaHeaderValueBytesOffset] = byte(d8) + (*c)[doubleDeltaHeaderIsIntOffset] = 0 binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueDeltaOffset:], + (*c)[doubleDeltaHeaderBaseValueDeltaOffset:], math.Float64bits(float64(s.Value)), ) } else { binary.LittleEndian.PutUint64( - c[doubleDeltaHeaderBaseValueDeltaOffset:], + (*c)[doubleDeltaHeaderBaseValueDeltaOffset:], math.Float64bits(float64(baseValueDelta)), ) } - return []Chunk{&c}, nil + return nil } // doubleDeltaEncodedIndexAccessor implements indexAccessor. diff --git a/encoding/factory.go b/encoding/factory.go index 83ab9f2602d5..5ac314d9d0fd 100644 --- a/encoding/factory.go +++ b/encoding/factory.go @@ -1,6 +1,7 @@ package encoding import ( + "errors" "flag" "fmt" "strconv" @@ -26,6 +27,15 @@ func (Config) RegisterFlags(f *flag.FlagSet) { flag.IntVar(&bigchunkSizeCapBytes, "store.bigchunk-size-cap-bytes", bigchunkSizeCapBytes, "When using bigchunk encoding, start a new bigchunk if over this size (0 = unlimited)") } +// Validate errors out if the encoding is set to Delta. +func (Config) Validate() error { + if DefaultEncoding == Delta { + // Delta is deprecated. + return errors.New("delta encoding is deprecated") + } + return nil +} + // String implements flag.Value. func (e Encoding) String() string { if known, found := encodings[e]; found { @@ -35,7 +45,8 @@ func (e Encoding) String() string { } const ( - // Delta encoding + // Delta encoding is no longer supported and will be automatically changed to DoubleDelta. + // It still exists here to not change the `ingester.chunk-encoding` flag values. Delta Encoding = iota // DoubleDelta encoding DoubleDelta @@ -51,12 +62,6 @@ type encoding struct { } var encodings = map[Encoding]encoding{ - Delta: { - Name: "Delta", - New: func() Chunk { - return newDeltaEncodedChunk(d1, d0, true, ChunkLen) - }, - }, DoubleDelta: { Name: "DoubleDelta", New: func() Chunk { diff --git a/encoding/varbit.go b/encoding/varbit.go index c2663fe9e247..2df8abc48271 100644 --- a/encoding/varbit.go +++ b/encoding/varbit.go @@ -260,17 +260,20 @@ func newVarbitChunk(enc varbitValueEncoding) *varbitChunk { } // Add implements chunk. -func (c *varbitChunk) Add(s model.SamplePair) ([]Chunk, error) { +func (c *varbitChunk) Add(s model.SamplePair) (Chunk, error) { offset := c.nextSampleOffset() switch { case c.closed(): - return addToOverflowChunk(c, s) + return addToOverflowChunk(s) case offset > varbitNextSampleBitOffsetThreshold: - return c.addLastSample(s), nil + c.addLastSample(s) + return nil, nil case offset == varbitFirstSampleBitOffset: - return c.addFirstSample(s), nil + c.addFirstSample(s) + return nil, nil case offset == varbitSecondSampleBitOffset: - return c.addSecondSample(s) + err := c.addSecondSample(s) + return nil, err } return c.addLaterSample(s, offset) } @@ -492,7 +495,7 @@ func (c varbitChunk) setLastSample(s model.SamplePair) { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk { +func (c *varbitChunk) addFirstSample(s model.SamplePair) { binary.BigEndian.PutUint64( (*c)[varbitFirstTimeOffset:], uint64(s.Timestamp), @@ -503,21 +506,21 @@ func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk { ) c.setLastSample(s) // To simplify handling of single-sample chunks. c.setNextSampleOffset(varbitSecondSampleBitOffset) - return []Chunk{c} } // addSecondSample is a helper method only used by c.add(). It calculates the // first time delta from the provided sample and adds it to the chunk together // with the provided sample as the last sample. -func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { +func (c *varbitChunk) addSecondSample(s model.SamplePair) error { firstTimeDelta := s.Timestamp - c.firstTime() if firstTimeDelta < 0 { - return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) + return fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) } if firstTimeDelta > varbitMaxTimeDelta { // A time delta too great. Still, we can add it as a last sample // before overflowing. - return c.addLastSample(s), nil + c.addLastSample(s) + return nil } (*c)[varbitFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16) (*c)[varbitFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8) @@ -529,7 +532,7 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { c.setLastSample(s) c.setNextSampleOffset(varbitThirdSampleBitOffset) - return []Chunk{c}, nil + return nil } // addLastSample is a helper method only used by c.add() and in other helper @@ -538,15 +541,15 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { // adds the very last sample added to this chunk ever, while setLastSample sets // the sample most recently added to the chunk so that it can be used for the // calculations required to add the next sample. -func (c *varbitChunk) addLastSample(s model.SamplePair) []Chunk { +func (c *varbitChunk) addLastSample(s model.SamplePair) { c.setLastSample(s) (*c)[varbitFlagOffset] |= 0x80 - return []Chunk{c} + return } // addLaterSample is a helper method only used by c.add(). It adds a third or // later sample. -func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk, error) { +func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) (Chunk, error) { var ( lastTime = c.lastTime() lastTimeDelta = c.lastTimeDelta() @@ -564,39 +567,88 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk if newTimeDelta > varbitMaxTimeDelta { // A time delta too great. Still, we can add it as a last sample // before overflowing. - return c.addLastSample(s), nil + c.addLastSample(s) + return nil, nil } // Analyze worst case, does it fit? If not, set new sample as the last. if int(offset)+varbitWorstCaseBitsPerSample[encoding] > ChunkLen*8 { - return c.addLastSample(s), nil + c.addLastSample(s) + return nil, nil } // Transcoding/overflow decisions first. if encoding == varbitZeroEncoding && s.Value != lastValue { // Cannot go on with zero encoding. - if offset > ChunkLen*4 { - // Chunk already half full. Don't transcode, overflow instead. - return addToOverflowChunk(c, s) - } - if isInt32(s.Value - lastValue) { - // Trying int encoding looks promising. - return transcodeAndAdd(newVarbitChunk(varbitIntDoubleDeltaEncoding), c, s) + if offset <= ChunkLen*4 { + var result []Chunk + var err error + if isInt32(s.Value - lastValue) { + // Trying int encoding looks promising. + result, err = transcodeAndAdd(newVarbitChunk(varbitIntDoubleDeltaEncoding), c, s) + } else { + result, err = transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + } + if err != nil { + return nil, err + } + + // We cannot handle >2 chunks returned as we can only return 1 chunk. + // Ideally there wont be >2 chunks, but if it happens to be >2, + // we fall through to perfom `addToOverflowChunk` instead. + if len(result) == 1 { + // Replace the current chunk with the new bigger chunk. + c0 := result[0].(*varbitChunk) + *c = *c0 + return nil, nil + } else if len(result) == 2 { + // Replace the current chunk with the new bigger chunk + // and return the additional chunk. + c0 := result[0].(*varbitChunk) + c1 := result[1].(*varbitChunk) + *c = *c0 + return c1, nil + } } - return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + + // Chunk is already half full. Better create a new one and save the transcoding efforts. + // We also perform this if `transcodeAndAdd` resulted in >2 chunks. + return addToOverflowChunk(s) } if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) { // Cannot go on with int encoding. - if offset > ChunkLen*4 { - // Chunk already half full. Don't transcode, overflow instead. - return addToOverflowChunk(c, s) + if offset <= ChunkLen*4 { + result, err := transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + if err != nil { + return nil, err + } + // We cannot handle >2 chunks returned as we can only return 1 chunk. + // Ideally there wont be >2 chunks, but if it happens to be >2, + // we fall through to perfom `addToOverflowChunk` instead. + if len(result) == 1 { + // Replace the current chunk with the new bigger chunk. + c0 := result[0].(*varbitChunk) + *c = *c0 + return nil, nil + } else if len(result) == 2 { + // Replace the current chunk with the new bigger chunk + // and return the additional chunk. + c0 := result[0].(*varbitChunk) + c1 := result[1].(*varbitChunk) + *c = *c0 + return c1, nil + } } - return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + + // Chunk is already half full. Better create a new one and save the transcoding efforts. + // We also perform this if `transcodeAndAdd` resulted in >2 chunks. + return addToOverflowChunk(s) } offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta) if overflow { - return c.addLastSample(s), nil + c.addLastSample(s) + return nil, nil } switch encoding { case varbitZeroEncoding: @@ -613,7 +665,7 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk c.setNextSampleOffset(offset) c.setLastSample(s) - return []Chunk{c}, nil + return nil, nil } func (c varbitChunk) prepForThirdSample( diff --git a/testutils/testutils.go b/testutils/testutils.go index 0a5f1cf9dbec..667fcd1a3bac 100644 --- a/testutils/testutils.go +++ b/testutils/testutils.go @@ -80,12 +80,13 @@ func dummyChunk(now model.Time) chunk.Chunk { } func dummyChunkFor(now model.Time, metric labels.Labels) chunk.Chunk { - cs, _ := promchunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) + cs := promchunk.New() + cs.Add(model.SamplePair{Timestamp: now, Value: 0}) chunk := chunk.NewChunk( userID, client.Fingerprint(metric), metric, - cs[0], + cs, now.Add(-time.Hour), now, )