diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 6a96dc5f530f..774fe502a275 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -67,9 +67,6 @@ type MemChunk struct { // the chunk format default to v2 format byte encoding Encoding - - readers ReaderPool - writers WriterPool } type block struct { @@ -81,8 +78,6 @@ type block struct { offset int // The offset of the block in the chunk. uncompressedSize int // Total uncompressed size in bytes when the chunk is cut. - - readers ReaderPool } // This block holds the un-compressed entries. Once it has enough data, this is @@ -161,8 +156,6 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk { format: chunkFormatV2, encoding: enc, - writers: getWriterPool(enc), - readers: getReaderPool(enc), } return c @@ -188,7 +181,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { bc.format = version switch version { case chunkFormatV1: - bc.readers, bc.writers = &Gzip, &Gzip + bc.encoding = EncGZIP case chunkFormatV2: // format v2 has a byte for block encoding. enc := Encoding(db.byte()) @@ -196,7 +189,6 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { return nil, errors.Wrap(db.err(), "verifying encoding") } bc.encoding = enc - bc.readers, bc.writers = getReaderPool(enc), getWriterPool(enc) default: return nil, errors.Errorf("invalid version %d", version) } @@ -215,9 +207,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { bc.blocks = make([]block, 0, num) for i := 0; i < num; i++ { - blk := block{ - readers: bc.readers, - } + var blk block // Read #entries. blk.numEntries = db.uvarint() @@ -430,13 +420,12 @@ func (c *MemChunk) cut() error { return nil } - b, err := c.head.serialise(c.writers) + b, err := c.head.serialise(getWriterPool(c.encoding)) if err != nil { return err } c.blocks = append(c.blocks, block{ - readers: c.readers, b: b, numEntries: len(c.head.entries), mint: c.head.mint, @@ -483,7 +472,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi if maxt < b.mint || b.maxt < mint { continue } - its = append(its, b.Iterator(ctx, filter)) + its = append(its, encBlock{c.encoding, b}.Iterator(ctx, filter)) } if !c.head.isEmpty() { @@ -512,7 +501,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, mintT, maxtT time.Time, f if maxt < b.mint || b.maxt < mint { continue } - its = append(its, b.SampleIterator(ctx, filter, extractor)) + its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, filter, extractor)) } if !c.head.isEmpty() { @@ -533,24 +522,33 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { for _, b := range c.blocks { if maxt >= b.mint && b.maxt >= mint { - blocks = append(blocks, b) + blocks = append(blocks, encBlock{c.encoding, b}) } } return blocks } -func (b block) Iterator(ctx context.Context, filter logql.LineFilter) iter.EntryIterator { +// encBlock is an internal wrapper for a block, mainly to avoid binding an encoding in a block itself. +// This may seem roundabout, but the encoding is already a field on the parent MemChunk type. encBlock +// then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the +// chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former. +type encBlock struct { + enc Encoding + block +} + +func (b encBlock) Iterator(ctx context.Context, filter logql.LineFilter) iter.EntryIterator { if len(b.b) == 0 { return emptyIterator } - return newEntryIterator(ctx, b.readers, b.b, filter) + return newEntryIterator(ctx, getReaderPool(b.enc), b.b, filter) } -func (b block) SampleIterator(ctx context.Context, filter logql.LineFilter, extractor logql.SampleExtractor) iter.SampleIterator { +func (b encBlock) SampleIterator(ctx context.Context, filter logql.LineFilter, extractor logql.SampleExtractor) iter.SampleIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newSampleIterator(ctx, b.readers, b.b, filter, extractor) + return newSampleIterator(ctx, getReaderPool(b.enc), b.b, filter, extractor) } func (b block) Offset() int {