Skip to content

Commit

Permalink
Refactor enocding.Chunk.Add (grafana#1706)
Browse files Browse the repository at this point in the history
* Refactor enocding.Chunk.Add

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Undo pointer-to-slice changes in varbit

Signed-off-by: Bryan Boreham <bryan@weave.works>

* Error on setting Delta encoding

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Don't replace Delta with DoubleDelta

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome authored and gouthamve committed Oct 18, 2019
1 parent c11709c commit 7910fa4
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 588 deletions.
9 changes: 6 additions & 3 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,26 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) {
chunks := []chunk.Chunk{}
for i := 0; i < 100; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
promChunk, _ := prom_chunk.New().Add(model.SamplePair{
promChunk := prom_chunk.New()
nc, err := promChunk.Add(model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(i),
})
require.NoError(t, err)
require.Nil(t, nc)
c := chunk.NewChunk(
userID,
model.Fingerprint(1),
labels.Labels{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
},
promChunk[0],
promChunk,
ts,
ts.Add(chunkLen),
)

err := c.Encode()
err = c.Encode()
require.NoError(t, err)
buf, err := c.Encoded()
require.NoError(t, err)
Expand Down
18 changes: 12 additions & 6 deletions chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,22 +579,25 @@ func TestChunkStoreRandom(t *testing.T) {
const chunkLen = 2 * 3600 // in seconds
for i := 0; i < 100; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
chunks, _ := encoding.New().Add(model.SamplePair{
ch := encoding.New()
nc, err := ch.Add(model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(float64(i)),
})
require.NoError(t, err)
require.Nil(t, nc)
chunk := NewChunk(
userID,
model.Fingerprint(1),
labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "baz"},
},
chunks[0],
ch,
ts,
ts.Add(chunkLen*time.Second).Add(-1*time.Second),
)
err := chunk.Encode()
err = chunk.Encode()
require.NoError(t, err)
err = store.Put(ctx, []Chunk{chunk})
require.NoError(t, err)
Expand Down Expand Up @@ -644,23 +647,26 @@ func TestChunkStoreLeastRead(t *testing.T) {
const chunkLen = 60 // in seconds
for i := 0; i < 24; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
chunks, _ := encoding.New().Add(model.SamplePair{
ch := encoding.New()
nc, err := ch.Add(model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(float64(i)),
})
require.NoError(t, err)
require.Nil(t, nc)
chunk := NewChunk(
userID,
model.Fingerprint(1),
labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "baz"},
},
chunks[0],
ch,
ts,
ts.Add(chunkLen*time.Second),
)
t.Logf("Loop %d", i)
err := chunk.Encode()
err = chunk.Encode()
require.NoError(t, err)
err = store.Put(ctx, []Chunk{chunk})
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ func dummyChunkForEncoding(now model.Time, metric labels.Labels, enc encoding.En
c, _ := encoding.NewForEncoding(enc)
for i := 0; i < samples; i++ {
t := time.Duration(i) * 15 * time.Second
cs, err := c.Add(model.SamplePair{Timestamp: now.Add(t), Value: 0})
nc, err := c.Add(model.SamplePair{Timestamp: now.Add(t), Value: 0})
if err != nil {
panic(err)
}
c = cs[0]
if nc != nil {
panic("returned chunk was not nil")
}
}
chunk := NewChunk(
userID,
Expand Down
6 changes: 3 additions & 3 deletions encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ func newBigchunk() *bigchunk {
return &bigchunk{}
}

func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {
func (b *bigchunk) Add(sample model.SamplePair) (Chunk, error) {
if b.remainingSamples == 0 {
if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes {
return addToOverflowChunk(b, sample)
return addToOverflowChunk(sample)
}
if err := b.addNextChunk(sample.Timestamp); err != nil {
return nil, err
Expand All @@ -44,7 +44,7 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {

b.appender.Append(int64(sample.Timestamp), float64(sample.Value))
b.remainingSamples--
return []Chunk{b}, nil
return nil, nil
}

// addNextChunk adds a new XOR "subchunk" to the internal list of chunks.
Expand Down
8 changes: 4 additions & 4 deletions encoding/bigchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
func TestSliceBiggerChunk(t *testing.T) {
var c Chunk = newBigchunk()
for i := 0; i < 12*3600/15; i++ {
cs, err := c.Add(model.SamplePair{
nc, err := c.Add(model.SamplePair{
Timestamp: model.Time(i * step),
Value: model.SampleValue(i),
})
require.NoError(t, err)
c = cs[0]
require.Nil(t, nc)
}

// Test for when the slice aligns perfectly with the sub-chunk boundaries.
Expand Down Expand Up @@ -69,12 +69,12 @@ func BenchmarkBiggerChunkMemory(b *testing.B) {
for i := 0; i < b.N; i++ {
var c Chunk = newBigchunk()
for i := 0; i < 12*3600/15; i++ {
cs, err := c.Add(model.SamplePair{
nc, err := c.Add(model.SamplePair{
Timestamp: model.Time(i * step),
Value: model.SampleValue(i),
})
require.NoError(b, err)
c = cs[0]
require.Nil(b, nc)
}

c.(*bigchunk).printSize()
Expand Down
39 changes: 22 additions & 17 deletions encoding/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ var (
// goroutine-safe.
type Chunk interface {
// Add adds a SamplePair to the chunks, performs any necessary
// re-encoding, and adds any necessary overflow chunks. It returns the
// new version of the original chunk, followed by overflow chunks, if
// any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the original chunk.
Add(sample model.SamplePair) ([]Chunk, error)
// re-encoding, and creates any necessary overflow chunk.
// The returned Chunk is the overflow chunk if it was created.
// The returned Chunk is nil if the sample got appended to the same chunk.
Add(sample model.SamplePair) (Chunk, error)
// NewIterator returns an iterator for the chunks.
// The iterator passed as argument is for re-use. Depending on implementation,
// the iterator can be re-used or a new iterator can be allocated.
Expand Down Expand Up @@ -123,12 +121,13 @@ func RangeValues(it Iterator, in metric.Interval) ([]model.SamplePair, error) {
// addToOverflowChunk is a utility function that creates a new chunk as overflow
// chunk, adds the provided sample to it, and returns a chunk slice containing
// the provided old chunk followed by the new overflow chunk.
func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) {
overflowChunks, err := New().Add(s)
func addToOverflowChunk(s model.SamplePair) (Chunk, error) {
overflowChunk := New()
_, err := overflowChunk.Add(s)
if err != nil {
return nil, err
}
return []Chunk{c, overflowChunks[0]}, nil
return overflowChunk, nil
}

// transcodeAndAdd is a utility function that transcodes the dst chunk into the
Expand All @@ -139,27 +138,33 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error)
Ops.WithLabelValues(Transcode).Inc()

var (
head = dst
body, NewChunks []Chunk
err error
head = dst
newChunk Chunk
body = []Chunk{head}
err error
)

it := src.NewIterator(nil)
for it.Scan() {
if NewChunks, err = head.Add(it.Value()); err != nil {
if newChunk, err = head.Add(it.Value()); err != nil {
return nil, err
}
body = append(body, NewChunks[:len(NewChunks)-1]...)
head = NewChunks[len(NewChunks)-1]
if newChunk != nil {
body = append(body, newChunk)
head = newChunk
}
}
if it.Err() != nil {
return nil, it.Err()
}

if NewChunks, err = head.Add(s); err != nil {
if newChunk, err = head.Add(s); err != nil {
return nil, err
}
return append(body, NewChunks...), nil
if newChunk != nil {
body = append(body, newChunk)
}
return body, nil
}

// indexAccessor allows accesses to samples by index.
Expand Down
12 changes: 6 additions & 6 deletions encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func TestLen(t *testing.T) {
chunks := []Chunk{}
for _, encoding := range []Encoding{Delta, DoubleDelta, Varbit} {
for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk} {
c, err := NewForEncoding(encoding)
if err != nil {
t.Fatal(err)
Expand All @@ -43,11 +43,12 @@ func TestLen(t *testing.T) {
t.Errorf("chunk type %s should have %d samples, had %d", c.Encoding(), i, c.Len())
}

cs, _ := c.Add(model.SamplePair{
cs, err := c.Add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(i),
})
c = cs[0]
require.NoError(t, err)
require.Nil(t, cs)
}
}
}
Expand Down Expand Up @@ -95,13 +96,12 @@ func mkChunk(t *testing.T, encoding Encoding, samples int) Chunk {
require.NoError(t, err)

for i := 0; i < samples; i++ {
chunks, err := chunk.Add(model.SamplePair{
newChunk, err := chunk.Add(model.SamplePair{
Timestamp: model.Time(i * step),
Value: model.SampleValue(i),
})
require.NoError(t, err)
require.Len(t, chunks, 1)
chunk = chunks[0]
require.Nil(t, newChunk)
}

return chunk
Expand Down
Loading

0 comments on commit 7910fa4

Please sign in to comment.