Skip to content

Commit

Permalink
add support for slicing of chunk in encodings (grafana#3472)
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
sandeepsukhani authored Nov 10, 2020
1 parent 0261243 commit 8680148
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 43 deletions.
43 changes: 6 additions & 37 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ import (
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
)

// Errors that decode can return
const (
ErrInvalidChecksum = errs.Error("invalid chunk checksum")
ErrWrongMetadata = errs.Error("wrong chunk metadata")
ErrMetadataLength = errs.Error("chunk metadata wrong length")
ErrDataLength = errs.Error("chunk data wrong length")
ErrSliceOutOfRange = errs.Error("chunk can't be sliced out of its data range")
ErrSliceNoDataInRange = errs.Error("chunk has no data for given range to slice")
ErrSliceChunkOverflow = errs.Error("slicing should not overflow a chunk")
ErrInvalidChecksum = errs.Error("invalid chunk checksum")
ErrWrongMetadata = errs.Error("wrong chunk metadata")
ErrMetadataLength = errs.Error("chunk metadata wrong length")
ErrDataLength = errs.Error("chunk data wrong length")
ErrSliceOutOfRange = errs.Error("chunk can't be sliced out of its data range")
)

var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
Expand Down Expand Up @@ -338,39 +335,11 @@ func (c *Chunk) Slice(from, through model.Time) (*Chunk, error) {
return nil, ErrSliceOutOfRange
}

itr := c.Data.NewIterator(nil)
if !itr.FindAtOrAfter(from) {
return nil, ErrSliceNoDataInRange
}

pc, err := prom_chunk.NewForEncoding(c.Data.Encoding())
pc, err := c.Data.Rebound(from, through)
if err != nil {
return nil, err
}

for !itr.Value().Timestamp.After(through) {
oc, err := pc.Add(itr.Value())
if err != nil {
return nil, err
}

if oc != nil {
return nil, ErrSliceChunkOverflow
}
if !itr.Scan() {
break
}
}

err = itr.Err()
if err != nil {
return nil, err
}

if pc.Len() == 0 {
return nil, ErrSliceNoDataInRange
}

nc := NewChunk(c.UserID, c.Fingerprint, c.Metric, pc, from, through)
return &nc, nil
}
Expand Down
5 changes: 3 additions & 2 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -684,7 +685,7 @@ func (c *baseStore) reboundChunk(ctx context.Context, userID, chunkID string, pa
var newChunks []*Chunk
if partiallyDeletedInterval.Start > chunk.From {
newChunk, err := chunk.Slice(chunk.From, partiallyDeletedInterval.Start-1)
if err != nil && err != ErrSliceNoDataInRange {
if err != nil && err != encoding.ErrSliceNoDataInRange {
return errors.Wrapf(err, "when slicing chunk for interval %d - %d", chunk.From, partiallyDeletedInterval.Start-1)
}

Expand All @@ -695,7 +696,7 @@ func (c *baseStore) reboundChunk(ctx context.Context, userID, chunkID string, pa

if partiallyDeletedInterval.End < chunk.Through {
newChunk, err := chunk.Slice(partiallyDeletedInterval.End+1, chunk.Through)
if err != nil && err != ErrSliceNoDataInRange {
if err != nil && err != encoding.ErrSliceNoDataInRange {
return errors.Wrapf(err, "when slicing chunk for interval %d - %d", partiallyDeletedInterval.End+1, chunk.Through)
}

Expand Down
2 changes: 1 addition & 1 deletion chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestChunk_Slice(t *testing.T) {
{
name: "slice no data in range",
sliceRange: model.Interval{Start: chunkStartTime.Add(time.Second), End: chunkStartTime.Add(10 * time.Second)},
err: ErrSliceNoDataInRange,
err: encoding.ErrSliceNoDataInRange,
},
{
name: "slice interval not aligned with sample intervals",
Expand Down
4 changes: 4 additions & 0 deletions encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (b *bigchunk) Slice(start, end model.Time) Chunk {
}
}

func (b *bigchunk) Rebound(start, end model.Time) (Chunk, error) {
return reboundChunk(b, start, end)
}

type writer struct {
io.Writer
}
Expand Down
54 changes: 51 additions & 3 deletions encoding/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ import (
"sort"

"github.com/prometheus/common/model"
errs "github.com/weaveworks/common/errors"

"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
)

// ChunkLen is the length of a chunk in bytes.
const ChunkLen = 1024
const (
// ChunkLen is the length of a chunk in bytes.
ChunkLen = 1024

ErrSliceNoDataInRange = errs.Error("chunk has no data for given range to slice")
ErrSliceChunkOverflow = errs.Error("slicing should not overflow a chunk")
)

var (
errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries")
Expand All @@ -50,10 +56,15 @@ type Chunk interface {
Encoding() Encoding
Utilization() float64

// Slice returns a smaller chunk the includes all samples between start and end
// Slice returns a smaller chunk that includes all samples between start and end
// (inclusive). Its may over estimate. On some encodings it is a noop.
Slice(start, end model.Time) Chunk

// Rebound returns a smaller chunk that includes all samples between start and end (inclusive).
// We do not want to change existing Slice implementations because
// it is built specifically for query optimization and is a noop for some of the encodings.
Rebound(start, end model.Time) (Chunk, error)

// Len returns the number of samples in the chunk. Implementations may be
// expensive.
Len() int
Expand Down Expand Up @@ -246,3 +257,40 @@ func (it *indexAccessingChunkIterator) Batch(size int) Batch {
func (it *indexAccessingChunkIterator) Err() error {
return it.acc.err()
}

func reboundChunk(c Chunk, start, end model.Time) (Chunk, error) {
itr := c.NewIterator(nil)
if !itr.FindAtOrAfter(start) {
return nil, ErrSliceNoDataInRange
}

pc, err := NewForEncoding(c.Encoding())
if err != nil {
return nil, err
}

for !itr.Value().Timestamp.After(end) {
oc, err := pc.Add(itr.Value())
if err != nil {
return nil, err
}

if oc != nil {
return nil, ErrSliceChunkOverflow
}
if !itr.Scan() {
break
}
}

err = itr.Err()
if err != nil {
return nil, err
}

if pc.Len() == 0 {
return nil, ErrSliceNoDataInRange
}

return pc, nil
}
72 changes: 72 additions & 0 deletions encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func TestChunk(t *testing.T) {
t.Run(fmt.Sprintf("testChunkBatch/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
testChunkBatch(t, tc.encoding, samples)
})

t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
testChunkRebound(t, tc.encoding, samples)
})
}
}
}
Expand Down Expand Up @@ -220,3 +224,71 @@ func testChunkBatch(t *testing.T, encoding Encoding, samples int) {
require.False(t, iter.Scan())
require.NoError(t, iter.Err())
}

func testChunkRebound(t *testing.T, encoding Encoding, samples int) {
for _, tc := range []struct {
name string
sliceFrom, sliceTo model.Time
err error
}{
{
name: "slice first half",
sliceFrom: 0,
sliceTo: model.Time((samples / 2) * step),
},
{
name: "slice second half",
sliceFrom: model.Time((samples / 2) * step),
sliceTo: model.Time((samples - 1) * step),
},
{
name: "slice in the middle",
sliceFrom: model.Time(int(float64(samples)*0.25) * step),
sliceTo: model.Time(int(float64(samples)*0.75) * step),
},
{
name: "slice no data in range",
err: ErrSliceNoDataInRange,
sliceFrom: model.Time((samples + 1) * step),
sliceTo: model.Time(samples * 2 * step),
},
{
name: "slice interval not aligned with sample intervals",
sliceFrom: model.Time(0 + step/2),
sliceTo: model.Time(samples * step).Add(time.Duration(-step / 2)),
},
} {
t.Run(tc.name, func(t *testing.T) {
originalChunk := mkChunk(t, encoding, samples)

newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo)
if tc.err != nil {
require.Equal(t, tc.err, err)
return
}
require.NoError(t, err)

chunkItr := originalChunk.NewIterator(nil)
chunkItr.FindAtOrAfter(tc.sliceFrom)

newChunkItr := newChunk.NewIterator(nil)
newChunkItr.Scan()

for {
require.Equal(t, chunkItr.Value(), newChunkItr.Value())

originalChunksHasMoreSamples := chunkItr.Scan()
newChunkHasMoreSamples := newChunkItr.Scan()

// originalChunk and newChunk both should end at same time or newChunk should end before or at slice end time
if !originalChunksHasMoreSamples || chunkItr.Value().Timestamp > tc.sliceTo {
require.False(t, newChunkHasMoreSamples)
break
}

require.True(t, newChunkHasMoreSamples)
}

})
}
}
4 changes: 4 additions & 0 deletions encoding/doubledelta.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ func (c *doubleDeltaEncodedChunk) Slice(_, _ model.Time) Chunk {
return c
}

func (c *doubleDeltaEncodedChunk) Rebound(start, end model.Time) (Chunk, error) {
return reboundChunk(c, start, end)
}

// Marshal implements chunk.
func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
Expand Down
4 changes: 4 additions & 0 deletions encoding/varbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (c *varbitChunk) Slice(_, _ model.Time) Chunk {
return c
}

func (c *varbitChunk) Rebound(start, end model.Time) (Chunk, error) {
return reboundChunk(c, start, end)
}

// Marshal implements chunk.
func (c varbitChunk) Marshal(w io.Writer) error {
size := c.Size()
Expand Down

0 comments on commit 8680148

Please sign in to comment.