Skip to content

Commit

Permalink
removes r/w pools from block/chunk types (#2711)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Oct 12, 2020
1 parent e644095 commit b878716
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ type MemChunk struct {
// the chunk format default to v2
format byte
encoding Encoding

readers ReaderPool
writers WriterPool
}

type block struct {
Expand All @@ -81,8 +78,6 @@ type block struct {

offset int // The offset of the block in the chunk.
uncompressedSize int // Total uncompressed size in bytes when the chunk is cut.

readers ReaderPool
}

// This block holds the un-compressed entries. Once it has enough data, this is
Expand Down Expand Up @@ -161,8 +156,6 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk {
format: chunkFormatV2,

encoding: enc,
writers: getWriterPool(enc),
readers: getReaderPool(enc),
}

return c
Expand All @@ -188,15 +181,14 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
bc.format = version
switch version {
case chunkFormatV1:
bc.readers, bc.writers = &Gzip, &Gzip
bc.encoding = EncGZIP
case chunkFormatV2:
// format v2 has a byte for block encoding.
enc := Encoding(db.byte())
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying encoding")
}
bc.encoding = enc
bc.readers, bc.writers = getReaderPool(enc), getWriterPool(enc)
default:
return nil, errors.Errorf("invalid version %d", version)
}
Expand All @@ -215,9 +207,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
bc.blocks = make([]block, 0, num)

for i := 0; i < num; i++ {
blk := block{
readers: bc.readers,
}
var blk block
// Read #entries.
blk.numEntries = db.uvarint()

Expand Down Expand Up @@ -430,13 +420,12 @@ func (c *MemChunk) cut() error {
return nil
}

b, err := c.head.serialise(c.writers)
b, err := c.head.serialise(getWriterPool(c.encoding))
if err != nil {
return err
}

c.blocks = append(c.blocks, block{
readers: c.readers,
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
Expand Down Expand Up @@ -483,7 +472,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, b.Iterator(ctx, filter))
its = append(its, encBlock{c.encoding, b}.Iterator(ctx, filter))
}

if !c.head.isEmpty() {
Expand Down Expand Up @@ -512,7 +501,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, mintT, maxtT time.Time, f
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, b.SampleIterator(ctx, filter, extractor))
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, filter, extractor))
}

if !c.head.isEmpty() {
Expand All @@ -533,24 +522,33 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block {

for _, b := range c.blocks {
if maxt >= b.mint && b.maxt >= mint {
blocks = append(blocks, b)
blocks = append(blocks, encBlock{c.encoding, b})
}
}
return blocks
}

func (b block) Iterator(ctx context.Context, filter logql.LineFilter) iter.EntryIterator {
// encBlock is an internal wrapper for a block, mainly to avoid binding an encoding in a block itself.
// This may seem roundabout, but the encoding is already a field on the parent MemChunk type. encBlock
// then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the
// chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former.
type encBlock struct {
enc Encoding
block
}

func (b encBlock) Iterator(ctx context.Context, filter logql.LineFilter) iter.EntryIterator {
if len(b.b) == 0 {
return emptyIterator
}
return newEntryIterator(ctx, b.readers, b.b, filter)
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, filter)
}

func (b block) SampleIterator(ctx context.Context, filter logql.LineFilter, extractor logql.SampleExtractor) iter.SampleIterator {
func (b encBlock) SampleIterator(ctx context.Context, filter logql.LineFilter, extractor logql.SampleExtractor) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newSampleIterator(ctx, b.readers, b.b, filter, extractor)
return newSampleIterator(ctx, getReaderPool(b.enc), b.b, filter, extractor)
}

func (b block) Offset() int {
Expand Down

0 comments on commit b878716

Please sign in to comment.