From 3ad7eacfdd7cae71dbd4eb3f8154f47b1717bdb9 Mon Sep 17 00:00:00 2001 From: Mark Knapp Date: Fri, 23 Nov 2018 11:34:14 -0500 Subject: [PATCH 01/10] Added storage size based retention method and new metrics Signed-off-by: Mark Knapp --- block.go | 39 ++++++- block_test.go | 18 +-- chunks/chunks.go | 9 ++ compact.go | 2 +- db.go | 285 ++++++++++++++++++++++++++++----------------- db_test.go | 130 ++++++++++++++------- index/index.go | 5 + querier_test.go | 4 +- tombstones.go | 36 ++++-- tombstones_test.go | 2 +- 10 files changed, 357 insertions(+), 173 deletions(-) diff --git a/block.go b/block.go index e5a66bd9..21ae737b 100644 --- a/block.go +++ b/block.go @@ -21,6 +21,8 @@ import ( "path/filepath" "sync" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" @@ -140,6 +142,12 @@ type Appendable interface { Appender() Appender } +// SizeReader returns the size of the object in bytes. +type SizeReader interface { + //Size returns the size in bytes. + Size() int64 +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -166,6 +174,7 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` + NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -257,7 +266,10 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { + if logger == nil { + logger = log.NewNopLogger() + } meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -272,11 +284,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } - tr, err := readTombstones(dir) + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + // TODO refactor to set this at block creation time as + // that would be the logical place for a block size to be calculated. + bs := blockSize(cr, ir, tsr) + meta.Stats.NumBytes = bs + err = writeMetaFile(dir, meta) + if err != nil { + level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) + } + pb := &Block{ dir: dir, meta: *meta, @@ -288,6 +309,17 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return pb, nil } +func blockSize(rr ...SizeReader) int64 { + var total int64 + for _, r := range rr { + if r != nil { + t := r.Size() + total += t + } + } + return total +} + // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -315,6 +347,9 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// Size returns the number of bytes that the block takes up. +func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } + // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") diff --git a/block_test.go b/block_test.go index 61666fe3..4738cb5d 100644 --- a/block_test.go +++ b/block_test.go @@ -53,7 +53,7 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) - b, err = OpenBlock(tmpdir, nil) + b, err = OpenBlock(nil, tmpdir, nil) testutil.Ok(t, err) testutil.Equals(t, true, b.meta.Compaction.Failed) } @@ -72,13 +72,14 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { testutil.Ok(t, writeTombstoneFile(dir, newMemTombstones())) - b, err := OpenBlock(dir, nil) + b, err := OpenBlock(nil, dir, nil) testutil.Ok(t, err) return b } -// createPopulatedBlock creates a block with nSeries series, and nSamples samples. -func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block { +// createPopulatedBlock creates a block with nSeries series, +// filled with samples between blockMint and blockMaxt. +func createPopulatedBlock(tb testing.TB, dir string, nSeries int, blockMint, blockMaxt int64) *Block { head, err := NewHead(nil, nil, nil, 2*60*60*1000) testutil.Ok(tb, err) defer head.Close() @@ -87,17 +88,16 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo testutil.Ok(tb, err) refs := make([]uint64, nSeries) - for n := 0; n < nSamples; n++ { + for ; blockMint <= blockMaxt; blockMint++ { app := head.Appender() - ts := n * 1000 for i, lbl := range lbls { if refs[i] != 0 { - err := app.AddFast(refs[i], int64(ts), rand.Float64()) + err := app.AddFast(refs[i], int64(blockMint), rand.Float64()) if err == nil { continue } } - ref, err := app.Add(lbl, int64(ts), rand.Float64()) + ref, err := app.Add(lbl, blockMint, rand.Float64()) testutil.Ok(tb, err) refs[i] = ref } @@ -113,7 +113,7 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) testutil.Ok(tb, err) - blk, err := OpenBlock(filepath.Join(dir, ulid.String()), nil) + blk, err := OpenBlock(nil, filepath.Join(dir, ulid.String()), nil) testutil.Ok(tb, err) return blk } diff --git a/chunks/chunks.go b/chunks/chunks.go index 5eab2398..d282f60a 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -345,6 +345,15 @@ func (s *Reader) Close() error { return closeAll(s.cs...) } +// Size returns the size of the chunks. +func (s *Reader) Size() int64 { + var size int64 + for _, f := range s.bs { + size += int64(f.Len()) + } + return size +} + func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( seq = int(ref >> 32) diff --git a/compact.go b/compact.go index f8e6ff54..49d4e586 100644 --- a/compact.go +++ b/compact.go @@ -347,7 +347,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if b == nil { var err error - b, err = OpenBlock(d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { return uid, err } diff --git a/db.go b/db.go index 9b92823e..03a11dd0 100644 --- a/db.go +++ b/db.go @@ -58,6 +58,10 @@ type Options struct { // Duration of persisted data to keep. RetentionDuration uint64 + // Maximum number of bytes in blocks to be retained. + // 0 or less means disabled. + MaxBytes int64 + // The sizes of the Blocks. BlockRanges []int64 @@ -127,11 +131,12 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + timeRetentionCount prometheus.Counter compactionsSkipped prometheus.Counter - cutoffs prometheus.Counter - cutoffsFailed prometheus.Counter startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + sizeRetentionCount prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -170,18 +175,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_time_retentions_total", + Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", + }) m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_skipped_total", Help: "Total number of skipped compactions due to disabled auto compaction.", }) - m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_total", - Help: "Number of times the database cut off block data from disk.", - }) - m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_failures_total", - Help: "Number of times the database failed to cut off block data from disk.", - }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database.", @@ -197,6 +198,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) + m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_storage_blocks_bytes_total", + Help: "The number of bytes that are currently used for local storage by all blocks.", + }) + m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_size_retentions_total", + Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", + }) if r != nil { r.MustRegister( @@ -204,11 +213,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.symbolTableSize, m.reloads, m.reloadsFailed, - m.cutoffs, - m.cutoffsFailed, + m.timeRetentionCount, m.compactionsTriggered, m.startTime, m.tombCleanTimer, + m.blocksBytes, + m.sizeRetentionCount, ) } return m @@ -327,25 +337,6 @@ func (db *DB) run() { } } -func (db *DB) beyondRetention(meta *BlockMeta) bool { - if db.opts.RetentionDuration == 0 { - return false - } - - db.mtx.RLock() - blocks := db.blocks[:] - db.mtx.RUnlock() - - if len(blocks) == 0 { - return false - } - - last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - - return meta.MaxTime < mint -} - // Appender opens a new appender against the database. func (db *DB) Appender() Appender { return dbAppender{db: db, Appender: db.head.Appender()} @@ -460,8 +451,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes -// a list of block directories which should be deleted during reload. +// reload blocks and trigger head truncation if new blocks appeared. // Blocks that are obsolete due to replacement or retention will be deleted. func (db *DB) reload() (err error) { defer func() { @@ -471,110 +461,191 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - dirs, err := blockDirs(db.dir) + loadable, corrupted, err := db.openBlocks() if err != nil { - return errors.Wrap(err, "find blocks") - } - // We delete old blocks that have been superseded by new ones by gathering all parents - // from existing blocks. Those parents all have newer replacements and can be safely deleted - // after we loaded the other blocks. - // This makes us resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. By creating - // blocks with their parents, we can pick up the deletion where it left off during a crash. - var ( - blocks []*Block - corrupted = map[ulid.ULID]error{} - opened = map[ulid.ULID]struct{}{} - deleteable = map[ulid.ULID]struct{}{} - ) - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - // The block was potentially in the middle of being deleted during a crash. - // Skip it since we may delete it properly further down again. - level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) + return err + } + deletable := db.deletableBlocks(loadable) - ulid, err2 := ulid.Parse(filepath.Base(dir)) - if err2 != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) - continue - } - corrupted[ulid] = err - continue + for ulid, err := range corrupted { + if _, ok := deletable[ulid]; !ok { + return errors.Wrap(err, "unexpected corrupted block") } - if db.beyondRetention(meta) { - deleteable[meta.ULID] = struct{}{} + } + + // All deletable blocks should not be loaded. + var ( + bb []*Block + blocksSize int64 + ) + for _, block := range loadable { + if _, ok := deletable[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block continue } - for _, b := range meta.Compaction.Parents { - deleteable[b.ULID] = struct{}{} - } + bb = append(bb, block) + blocksSize += block.Size() + } - // Blocks we failed to open should all be those we are want to delete anyway. - for c, err := range corrupted { - if _, ok := deleteable[c]; !ok { - return errors.Wrapf(err, "unexpected corrupted block %s", c) + loadable = bb + db.metrics.blocksBytes.Set(float64(blocksSize)) + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime + }) + if err := validateBlockSequence(loadable); err != nil { + return errors.Wrap(err, "invalid block sequence") + } + + // Swap new blocks first for subsequently created readers to be seen. + db.mtx.Lock() + oldBlocks := db.blocks + db.blocks = loadable + db.mtx.Unlock() + + for _, b := range oldBlocks { + if _, ok := deletable[b.Meta().ULID]; ok { + deletable[b.Meta().ULID] = b } } - // Load new blocks into memory. + + if err := db.deleteBlocks(deletable); err != nil { + return err + } + + // Garbage collect data in the head if the most recent persisted block + // covers data of its current time range. + if len(loadable) == 0 { + return nil + } + + maxt := loadable[len(loadable)-1].Meta().MaxTime + + return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") +} + +func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + corrupted = make(map[ulid.ULID]error) + + dirs, err := blockDirs(db.dir) + if err != nil { + return nil, nil, errors.Wrap(err, "find blocks") + } + for _, dir := range dirs { - meta, err := readMetaFile(dir) + ulid, err := ulid.Parse(filepath.Base(dir)) if err != nil { - return errors.Wrapf(err, "read meta information %s", dir) - } - // Don't load blocks that are scheduled for deletion. - if _, ok := deleteable[meta.ULID]; ok { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) continue } + // See if we already have the block in memory or open it otherwise. - b, ok := db.getBlock(meta.ULID) + block, ok := db.getBlock(ulid) if !ok { - b, err = OpenBlock(dir, db.chunkPool) + block, err = OpenBlock(db.logger, dir, db.chunkPool) if err != nil { - return errors.Wrapf(err, "open block %s", dir) + corrupted[ulid] = err } + } - blocks = append(blocks, b) - opened[meta.ULID] = struct{}{} + blocks = append(blocks, block) } + + return blocks, corrupted, nil +} + +// deletableBlocks returns all blocks past retention policy +// and blocks superseded by parents after a compaction. +func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { + deletable := make(map[ulid.ULID]*Block) + + // Delete old blocks that have been superseded by new ones by gathering their parents. + // Those parents all have newer replacements and + // can be safely deleted after loading the other blocks. + // This makes it resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. + // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + for _, block := range blocks { + for _, b := range block.Meta().Compaction.Parents { + deletable[b.ULID] = nil + } + } + + // Sort the blocks by time - newest to oldest (largest to smallest timestamp). + // This ensures that the retentions will remove the oldest blocks. sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime + return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime }) - if err := validateBlockSequence(blocks); err != nil { - return errors.Wrap(err, "invalid block sequence") + + for ulid, block := range db.beyondTimeRetention(blocks) { + deletable[ulid] = block } - // Swap in new blocks first for subsequently created readers to be seen. - // Then close previous blocks, which may block for pending readers to complete. - db.mtx.Lock() - oldBlocks := db.blocks - db.blocks = blocks - db.mtx.Unlock() + for ulid, block := range db.beyondSizeRetention(blocks) { + deletable[ulid] = block + } - // Drop old blocks from memory. - for _, b := range oldBlocks { - if _, ok := opened[b.Meta().ULID]; ok { - continue + return deletable +} + +func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + deleteable = make(map[ulid.ULID]*Block) + if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { // Time retention is disabled or no blocks to work with. + return + } + + for i, block := range blocks { + // The difference between the first block and this block is larger than the retention period so + // any blocks after that are added as deleteable. + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) { + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.timeRetentionCount.Inc() + break } - if err := b.Close(); err != nil { - level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + + return deleteable +} + +func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + deleteable = make(map[ulid.ULID]*Block) + if db.opts.MaxBytes <= 0 { // Size retention is disabled. + return + } + + var blocksSize = int64(0) + for i, block := range blocks { + blocksSize += block.Size() + if blocksSize > db.opts.MaxBytes { + // Add this and all following blocks for deletion. + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.sizeRetentionCount.Inc() + break } } - // Delete all obsolete blocks. None of them are opened any longer. - for ulid := range deleteable { + + return deleteable +} + +// deleteBlocks closes and deletes blocks from the disk. +// When the map contains a non nil block object it means it is loaded in memory +// so needs to be closed first as it might need to wait for pending readers to complete. +func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { + for ulid, block := range blocks { + if block != nil { + if err := block.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + } if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { return errors.Wrapf(err, "delete obsolete block %s", ulid) } } - - // Garbage collect data in the head if the most recent persisted block - // covers data of its current time range. - if len(blocks) == 0 { - return nil - } - maxt := blocks[len(blocks)-1].Meta().MaxTime - - return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") + return nil } // validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. diff --git a/db_test.go b/db_test.go index 99bde8c5..03020dde 100644 --- a/db_test.go +++ b/db_test.go @@ -27,6 +27,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -90,6 +91,9 @@ func TestDB_reloadOrder(t *testing.T) { testutil.Ok(t, db.reload()) blocks := db.Blocks() + for _, b := range blocks { + b.meta.Stats.NumBytes = 0 + } testutil.Equals(t, 3, len(blocks)) testutil.Equals(t, *metas[1], blocks[0].Meta()) testutil.Equals(t, *metas[0], blocks[1].Meta()) @@ -881,59 +885,105 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) } -func TestDB_Retention(t *testing.T) { - db, close := openTestDB(t, nil) +func TestTimeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1000}, + }) defer close() + defer db.Close() - lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} + blocks := []*BlockMeta{ + {MinTime: 500, MaxTime: 900}, // Oldest block + {MinTime: 1000, MaxTime: 1500}, + {MinTime: 1500, MaxTime: 2000}, // Newest Block + } - app := db.Appender() - _, err := app.Add(lbls, 0, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) + for _, m := range blocks { + createPopulatedBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + } - // create snapshot to make it create a block. - // TODO(gouthamve): Add a method to compact headblock. - snap, err := ioutil.TempDir("", "snap") - testutil.Ok(t, err) + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + metrics := &dto.Metric{} // Also check the internal metrics. - defer os.RemoveAll(snap) - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) + db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime) + testutil.Ok(t, db.reload()) + testutil.Ok(t, db.metrics.timeRetentionCount.Write(metrics)) - // reopen DB from snapshot - db, err = Open(snap, nil, nil, nil) - testutil.Ok(t, err) + expBlocks := blocks[1:] + actBlocks := db.Blocks() + actRetentCount := int(metrics.Counter.GetValue()) - testutil.Equals(t, 1, len(db.blocks)) + testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") + testutil.Equals(t, len(expBlocks), len(actBlocks)) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime) + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime) +} - app = db.Appender() - _, err = app.Add(lbls, 100, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) +func TestSizeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() - // Snapshot again to create another block. - snap, err = ioutil.TempDir("", "snap") - testutil.Ok(t, err) - defer os.RemoveAll(snap) + blocks := []*BlockMeta{ + {MinTime: 100, MaxTime: 200}, // Oldest block + {MinTime: 200, MaxTime: 300}, + {MinTime: 300, MaxTime: 400}, + {MinTime: 400, MaxTime: 500}, + {MinTime: 500, MaxTime: 600}, // Newest Block + } - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) + for _, m := range blocks { + createPopulatedBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) + } - // reopen DB from snapshot - db, err = Open(snap, nil, nil, &Options{ - RetentionDuration: 10, - BlockRanges: []int64{50}, - }) - testutil.Ok(t, err) - defer db.Close() + // Test that registered size matches the actual disk size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + metrics := &dto.Metric{} // Use the the actual internal metrics. + testutil.Ok(t, db.metrics.blocksBytes.Write(metrics)) + expSize := int64(metrics.Gauge.GetValue()) + actSize := dbDiskSize(db.Dir()) + testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") + + // Decrease the max bytes limit so that a delete is triggered. + // Check total size, total count and check that the oldest block was deleted. + firstBlockSize := db.Blocks()[0].Size() + sizeLimit := actSize - firstBlockSize + db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Ok(t, db.metrics.blocksBytes.Write(metrics)) + testutil.Ok(t, db.metrics.sizeRetentionCount.Write(metrics)) + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + expSize = int64(metrics.Gauge.GetValue()) + actRetentCount := int(metrics.Counter.GetValue()) + actSize = dbDiskSize(db.Dir()) + + testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") + testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") + testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit) + testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block") + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block") - testutil.Equals(t, 2, len(db.blocks)) +} - // Reload blocks, which should drop blocks beyond the retention boundary. - testutil.Ok(t, db.reload()) - testutil.Equals(t, 1, len(db.blocks)) - testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. +func dbDiskSize(dir string) int64 { + var statSize int64 + filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + // Include only index,tombstone and chunks. + if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) || + info.Name() == indexFilename || + info.Name() == tombstoneFilename { + statSize += info.Size() + } + return nil + }) + return statSize } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { diff --git a/index/index.go b/index/index.go index 880fbb19..30d91bf4 100644 --- a/index/index.go +++ b/index/index.go @@ -965,6 +965,11 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// Size returns the size of an index file. +func (r *Reader) Size() int64 { + return int64(r.b.Len()) +} + // LabelNames returns all the unique label names present in the index. func (r *Reader) LabelNames() ([]string, error) { labelNamesMap := make(map[string]struct{}, len(r.labels)) diff --git a/querier_test.go b/querier_test.go index 2eb5b037..1ca8389f 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1287,12 +1287,12 @@ func BenchmarkMergedSeriesSet(b *testing.B) { func BenchmarkPersistedQueries(b *testing.B) { for _, nSeries := range []int{10, 100} { - for _, nSamples := range []int{1000, 10000, 100000} { + for _, nSamples := range []int64{1000, 10000, 100000} { b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block := createPopulatedBlock(b, dir, nSeries, nSamples) + block := createPopulatedBlock(b, dir, nSeries, 1, nSamples) defer block.Close() q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime) diff --git a/tombstones.go b/tombstones.go index a1f30b59..07814040 100644 --- a/tombstones.go +++ b/tombstones.go @@ -113,37 +113,41 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, error) { +func readTombstones(dir string) (TombstoneReader, SizeReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil + return newMemTombstones(), nil, nil } else if err != nil { - return nil, err + return nil, nil, err + } + + sr := &TombstoneFile{ + size: int64(len(b)), } if len(b) < 5 { - return nil, errors.Wrap(errInvalidSize, "tombstones header") + return nil, sr, errors.Wrap(errInvalidSize, "tombstones header") } d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { - return nil, fmt.Errorf("invalid magic number %x", mg) + return nil, sr, fmt.Errorf("invalid magic number %x", mg) } if flag := d.byte(); flag != tombstoneFormatV1 { - return nil, fmt.Errorf("invalid tombstone format %x", flag) + return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) } if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.get()); err != nil { - return nil, errors.Wrap(err, "write to hash") + return nil, sr, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, errors.New("checksum did not match") + return nil, sr, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) { mint := d.varint64() maxt := d.varint64() if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, nil + return stonesMap, sr, nil } type memTombstones struct { @@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } +// TombstoneFile holds information about the tombstone file. +type TombstoneFile struct { + size int64 +} + +// Size returns the tombstone file size. +func (t *TombstoneFile) Size() int64 { + return t.size +} + func (*memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index e12574f1..2a106d70 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -46,7 +46,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) - restr, err := readTombstones(tmpdir) + restr, _, err := readTombstones(tmpdir) testutil.Ok(t, err) // Compare the two readers. From 376ca5864ec3742b5d4d36d0a3db7bbf6628e8db Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 11 Dec 2018 05:54:14 +0200 Subject: [PATCH 02/10] resolved conflicts Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 13 +++++ block_test.go | 10 ++-- db.go | 23 ++++++--- db_test.go | 127 ++++++++++++++++++++++++++++++++++++++++++++---- index/index.go | 2 +- querier_test.go | 2 +- 6 files changed, 154 insertions(+), 23 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..eda70358 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,13 @@ +## master / unreleased + + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` and new ppublic interface `SizeReader: Size() int64` + + +## 0.3.0 + + - [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path. + - [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct. + - [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors. + - [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)` + - [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field. + - [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset. diff --git a/block_test.go b/block_test.go index 4738cb5d..7db01a62 100644 --- a/block_test.go +++ b/block_test.go @@ -78,8 +78,8 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { } // createPopulatedBlock creates a block with nSeries series, -// filled with samples between blockMint and blockMaxt. -func createPopulatedBlock(tb testing.TB, dir string, nSeries int, blockMint, blockMaxt int64) *Block { +// filled with samples of the given mint,maxt time range. +func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block { head, err := NewHead(nil, nil, nil, 2*60*60*1000) testutil.Ok(tb, err) defer head.Close() @@ -88,16 +88,16 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries int, blockMint, blo testutil.Ok(tb, err) refs := make([]uint64, nSeries) - for ; blockMint <= blockMaxt; blockMint++ { + for ts := mint; ts <= maxt; ts++ { app := head.Appender() for i, lbl := range lbls { if refs[i] != 0 { - err := app.AddFast(refs[i], int64(blockMint), rand.Float64()) + err := app.AddFast(refs[i], ts, rand.Float64()) if err == nil { continue } } - ref, err := app.Add(lbl, blockMint, rand.Float64()) + ref, err := app.Add(lbl, ts, rand.Float64()) testutil.Ok(tb, err) refs[i] = ref } diff --git a/db.go b/db.go index 03a11dd0..26befa58 100644 --- a/db.go +++ b/db.go @@ -60,6 +60,8 @@ type Options struct { // Maximum number of bytes in blocks to be retained. // 0 or less means disabled. + // NOTE: For proper storage calculations need to concider + // the size of the WAL folder which is not affected by this option. MaxBytes int64 // The sizes of the Blocks. @@ -185,7 +187,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp", - Help: "Lowest timestamp value stored in the database.", + Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.", }, func() float64 { db.mtx.RLock() defer db.mtx.RUnlock() @@ -281,10 +283,19 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err != nil { return nil, err } + if err := db.reload(); err != nil { return nil, err } - if err := db.head.Init(); err != nil { + // Set the min valid time for the ingested samples + // to be no lower than the maxt of the last block. + blocks := db.Blocks() + minValidTime := int64(math.MinInt64) + if len(blocks) > 0 { + minValidTime = blocks[len(blocks)-1].Meta().MaxTime + } + + if err := db.head.Init(minValidTime); err != nil { return nil, errors.Wrap(err, "read WAL") } @@ -386,7 +397,8 @@ func (db *DB) compact() (err error) { if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 { break } - mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0]) + mint := db.head.MinTime() + maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0]) // Wrap head into a range that bounds all reads to it. head := &rangeHead{ @@ -897,9 +909,8 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { return sq, nil } -func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { - mint = (t / width) * width - return mint, mint + width +func rangeForTimestamp(t int64, width int64) (maxt int64) { + return (t/width)*width + width } // Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. diff --git a/db_test.go b/db_test.go index 03020dde..97dec6b3 100644 --- a/db_test.go +++ b/db_test.go @@ -25,8 +25,11 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" @@ -940,11 +943,9 @@ func TestSizeRetention(t *testing.T) { } // Test that registered size matches the actual disk size. - testutil.Ok(t, db.reload()) // Reload the db to register the new db size. - testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. - metrics := &dto.Metric{} // Use the the actual internal metrics. - testutil.Ok(t, db.metrics.blocksBytes.Write(metrics)) - expSize := int64(metrics.Gauge.GetValue()) + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. actSize := dbDiskSize(db.Dir()) testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") @@ -954,13 +955,11 @@ func TestSizeRetention(t *testing.T) { sizeLimit := actSize - firstBlockSize db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. testutil.Ok(t, db.reload()) // Reload the db to register the new db size. - testutil.Ok(t, db.metrics.blocksBytes.Write(metrics)) - testutil.Ok(t, db.metrics.sizeRetentionCount.Write(metrics)) expBlocks := blocks[1:] actBlocks := db.Blocks() - expSize = int64(metrics.Gauge.GetValue()) - actRetentCount := int(metrics.Counter.GetValue()) + expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) + actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) actSize = dbDiskSize(db.Dir()) testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") @@ -1247,6 +1246,11 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) } +// TestInitializeHeadTimestamp ensures that the h.minTime is set properly. +// - no blocks no WAL: set to the time of the first appended sample +// - no blocks with WAL: set to the smallest sample from the WAL +// - with blocks no WAL: set to the last block maxT +// - with blocks with WAL: same as above func TestInitializeHeadTimestamp(t *testing.T) { t.Run("clean", func(t *testing.T) { dir, err := ioutil.TempDir("", "test_head_init") @@ -1345,11 +1349,15 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, w.Close()) - db, err := Open(dir, nil, nil, nil) + r := prometheus.NewRegistry() + + db, err := Open(dir, nil, r, nil) testutil.Ok(t, err) testutil.Equals(t, int64(6000), db.head.MinTime()) testutil.Equals(t, int64(15000), db.head.MaxTime()) + // Check that old series has been GCed. + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.series)) }) } @@ -1485,3 +1493,102 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar"))) testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } + +// TestBlockRanges checks the following use cases: +// - No samples can be added with timestamps lower than the last block maxt. +// - The compactor doesn't create overlaping blocks +// even when the last blocks is not within the default boundaries. +// - Lower bondary is based on the smallest sample in the head and +// upper boundary is rounded to the configured block range. +// +// This ensures that a snapshot that includes the head and creates a block with a custom time range +// will not overlap with the first block created by the next compaction. +func TestBlockRanges(t *testing.T) { + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + + dir, err := ioutil.TempDir("", "test_storage") + if err != nil { + t.Fatalf("Opening test dir failed: %s", err) + } + + rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1 + + // Test that the compactor doesn't create overlapping blocks + // when a non standard block already exists. + firstBlockMaxT := int64(3) + createPopulatedBlock(t, dir, 1, 0, firstBlockMaxT) + db, err := Open(dir, logger, nil, DefaultOptions) + if err != nil { + t.Fatalf("Opening test storage failed: %s", err) + } + defer func() { + os.RemoveAll(dir) + }() + app := db.Appender() + lbl := labels.Labels{{"a", "b"}} + _, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64()) + if err == nil { + t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible") + } + _, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64()) + testutil.Ok(t, err) + secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction + _, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction + + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + for x := 1; x < 10; x++ { + if len(db.Blocks()) == 2 { + break + } + time.Sleep(100 * time.Millisecond) + } + testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout") + + if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime { + t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) + } + + // Test that wal records are skipped when an existing block covers the same time ranges + // and compaction doesn't create an overlapping block. + db.DisableCompactions() + _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64()) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.Close()) + + thirdBlockMaxt := secondBlockMaxt + 2 + createPopulatedBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt) + + db, err = Open(dir, logger, nil, DefaultOptions) + if err != nil { + t.Fatalf("Opening test storage failed: %s", err) + } + defer db.Close() + testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks") + testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block") + + app = db.Appender() + _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + for x := 1; x < 10; x++ { + if len(db.Blocks()) == 4 { + break + } + time.Sleep(100 * time.Millisecond) + } + + testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout") + + if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime { + t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) + } +} diff --git a/index/index.go b/index/index.go index 30d91bf4..bc2442ac 100644 --- a/index/index.go +++ b/index/index.go @@ -868,7 +868,7 @@ func (r *Reader) Symbols() (map[string]struct{}, error) { return res, nil } -// SymbolTable returns the symbol table that is used to resolve symbol references. +// SymbolTableSize returns the symbol table that is used to resolve symbol references. func (r *Reader) SymbolTableSize() uint64 { var size int for _, s := range r.symbols { diff --git a/querier_test.go b/querier_test.go index 1ca8389f..5562c837 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1292,7 +1292,7 @@ func BenchmarkPersistedQueries(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block := createPopulatedBlock(b, dir, nSeries, 1, nSamples) + block := createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples)) defer block.Close() q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime) From 5357508fc062d9260737c7785bb8ac73a7afffe7 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 12 Dec 2018 12:38:35 +0200 Subject: [PATCH 03/10] nits Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 2 +- block.go | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4685797..5c604e7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## master / unreleased - - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` and new ppublic interface `SizeReader: Size() int64` + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` and new public interface `SizeReader: Size() int64` ## 0.3.0 diff --git a/block.go b/block.go index 21ae737b..837f4a76 100644 --- a/block.go +++ b/block.go @@ -144,7 +144,7 @@ type Appendable interface { // SizeReader returns the size of the object in bytes. type SizeReader interface { - //Size returns the size in bytes. + // Size returns the size in bytes. Size() int64 } @@ -313,8 +313,7 @@ func blockSize(rr ...SizeReader) int64 { var total int64 for _, r := range rr { if r != nil { - t := r.Size() - total += t + total += r.Size() } } return total From 3ce1113b196b9d7b917fbf7c8a6e837fad416f27 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 12 Dec 2018 12:45:09 +0200 Subject: [PATCH 04/10] calculate chunks size at the reader creation Signed-off-by: Krasi Georgiev --- chunks/chunks.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/chunks/chunks.go b/chunks/chunks.go index d282f60a..a87f77e0 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -284,17 +284,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // Reader implements a SeriesReader for a serialized byte stream // of series data. type Reader struct { - // The underlying bytes holding the encoded series data. - bs []ByteSlice - - // Closers for resources behind the byte slices. - cs []io.Closer - + bs []ByteSlice // The underlying bytes holding the encoded series data. + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. pool chunkenc.Pool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} + var totalSize int64 for i, b := range cr.bs { if b.Len() < 4 { @@ -304,7 +302,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { return nil, errors.Errorf("invalid magic number %x", m) } + totalSize += int64(b.Len()) } + cr.size = totalSize return &cr, nil } @@ -327,9 +327,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { pool = chunkenc.NewPool() } - var bs []ByteSlice - var cs []io.Closer - + var ( + bs []ByteSlice + cs []io.Closer + ) for _, fn := range files { f, err := fileutil.OpenMmapFile(fn) if err != nil { @@ -347,11 +348,7 @@ func (s *Reader) Close() error { // Size returns the size of the chunks. func (s *Reader) Size() int64 { - var size int64 - for _, f := range s.bs { - size += int64(f.Len()) - } - return size + return s.size } func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { From 23b9ee0726ac2feb03aa7e6278fdb64df2ccff0d Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 29 Dec 2018 08:38:41 +0200 Subject: [PATCH 05/10] simplify the metric check Signed-off-by: Krasi Georgiev --- db_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/db_test.go b/db_test.go index 77866b92..5f3b5022 100644 --- a/db_test.go +++ b/db_test.go @@ -30,7 +30,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" - dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -933,17 +932,14 @@ func TestTimeRetention(t *testing.T) { testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. - metrics := &dto.Metric{} // Also check the internal metrics. db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime) testutil.Ok(t, db.reload()) - testutil.Ok(t, db.metrics.timeRetentionCount.Write(metrics)) expBlocks := blocks[1:] actBlocks := db.Blocks() - actRetentCount := int(metrics.Counter.GetValue()) - testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch") testutil.Equals(t, len(expBlocks), len(actBlocks)) testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime) testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime) From a61892864390bfff8716df07d96d340120233dcd Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 29 Dec 2018 10:09:09 +0200 Subject: [PATCH 06/10] nit Signed-off-by: Krasi Georgiev --- querier_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/querier_test.go b/querier_test.go index 3695346f..c117382b 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1232,7 +1232,7 @@ func BenchmarkPersistedQueries(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block, err := OpenBlock(createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples)), nil) + block, err := OpenBlock(nil, createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples)), nil) testutil.Ok(b, err) defer block.Close() From 34fb5afce93d339364c2cefd053243fc784b05f2 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 29 Dec 2018 13:29:09 +0200 Subject: [PATCH 07/10] merged master Signed-off-by: Krasi Georgiev --- block_test.go | 2 +- db_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/block_test.go b/block_test.go index 76db2a6f..789aebaa 100644 --- a/block_test.go +++ b/block_test.go @@ -46,7 +46,7 @@ func TestSetCompactionFailed(t *testing.T) { defer os.RemoveAll(tmpdir) blockDir := createBlock(t, tmpdir, 0, 0, 0) - b, err := OpenBlock(blockDir, nil) + b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Ok(t, b.setCompactionFailed()) diff --git a/db_test.go b/db_test.go index 5d97b4ad..e364114f 100644 --- a/db_test.go +++ b/db_test.go @@ -837,7 +837,7 @@ func TestTombstoneCleanFail(t *testing.T) { totalBlocks := 2 for i := 0; i < totalBlocks; i++ { blockDir := createBlock(t, db.Dir(), 0, 0, 0) - block, err := OpenBlock(blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. tomb := newMemTombstones() @@ -880,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(createBlock(c.t, dest, 0, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -918,7 +918,7 @@ func TestTimeRetention(t *testing.T) { } for _, m := range blocks { - createPopulatedBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) } testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. @@ -952,7 +952,7 @@ func TestSizeRetention(t *testing.T) { } for _, m := range blocks { - createPopulatedBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) + createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) } // Test that registered size matches the actual disk size. From 2933dbca539b963f55d78021702e52bfeba428bf Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 14 Jan 2019 15:48:50 +0200 Subject: [PATCH 08/10] gouthamve comments Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 5 ++++- db.go | 16 +++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fcd45cda..a4447168 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ ## master / unreleased - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` and new public interface `SizeReader: Size() int64` + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: + - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - new public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. diff --git a/db.go b/db.go index 83a5322b..45d5e573 100644 --- a/db.go +++ b/db.go @@ -60,8 +60,9 @@ type Options struct { // Maximum number of bytes in blocks to be retained. // 0 or less means disabled. - // NOTE: For proper storage calculations need to concider - // the size of the WAL folder which is not affected by this option. + // NOTE: For proper storage calculations need to consider + // the size of the WAL folder which is not added when calculating + // the current size of the database. MaxBytes int64 // The sizes of the Blocks. @@ -483,6 +484,7 @@ func (db *DB) reload() (err error) { } deletable := db.deletableBlocks(loadable) + // A corrupted block that is not set for deletion by deletableBlocks() should return an error. for ulid, err := range corrupted { if _, ok := deletable[ulid]; !ok { return errors.Wrap(err, "unexpected corrupted block") @@ -541,26 +543,26 @@ func (db *DB) reload() (err error) { } func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { - corrupted = make(map[ulid.ULID]error) - dirs, err := blockDirs(db.dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") } + corrupted = make(map[ulid.ULID]error) for _, dir := range dirs { - ulid, err := ulid.Parse(filepath.Base(dir)) + meta, err := readMetaFile(dir) if err != nil { level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) continue } // See if we already have the block in memory or open it otherwise. - block, ok := db.getBlock(ulid) + block, ok := db.getBlock(meta.ULID) if !ok { block, err = OpenBlock(db.logger, dir, db.chunkPool) if err != nil { - corrupted[ulid] = err + corrupted[meta.ULID] = err + continue } } From 33f284682b9ffeeb467e767b6747f87cae24d708 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 15 Jan 2019 23:10:08 +0200 Subject: [PATCH 09/10] ignore corrupted blocks that have been replaced by parents. Signed-off-by: Krasi Georgiev --- db.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/db.go b/db.go index 45d5e573..59b51b54 100644 --- a/db.go +++ b/db.go @@ -482,14 +482,22 @@ func (db *DB) reload() (err error) { if err != nil { return err } + deletable := db.deletableBlocks(loadable) - // A corrupted block that is not set for deletion by deletableBlocks() should return an error. - for ulid, err := range corrupted { - if _, ok := deletable[ulid]; !ok { - return errors.Wrap(err, "unexpected corrupted block") + // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // This makes it resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. + // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + deletable[b.ULID] = nil } } + if len(corrupted) > 0 { + return errors.Wrap(err, "unexpected corrupted block") + } // All deletable blocks should not be loaded. var ( @@ -577,18 +585,6 @@ func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { deletable := make(map[ulid.ULID]*Block) - // Delete old blocks that have been superseded by new ones by gathering their parents. - // Those parents all have newer replacements and - // can be safely deleted after loading the other blocks. - // This makes it resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. - // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. - for _, block := range blocks { - for _, b := range block.Meta().Compaction.Parents { - deletable[b.ULID] = nil - } - } - // Sort the blocks by time - newest to oldest (largest to smallest timestamp). // This ensures that the retentions will remove the oldest blocks. sort.Slice(blocks, func(i, j int) bool { From 5f49426b14aa9508563dfaf8fa8f5db751bc1520 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 16 Jan 2019 11:15:04 +0200 Subject: [PATCH 10/10] nits Signed-off-by: Krasi Georgiev --- db.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index 59b51b54..63070043 100644 --- a/db.go +++ b/db.go @@ -572,16 +572,13 @@ func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err corrupted[meta.ULID] = err continue } - } blocks = append(blocks, block) } - return blocks, corrupted, nil } -// deletableBlocks returns all blocks past retention policy -// and blocks superseded by parents after a compaction. +// deletableBlocks returns all blocks past retention policy. func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { deletable := make(map[ulid.ULID]*Block) @@ -603,14 +600,15 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { } func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { - deleteable = make(map[ulid.ULID]*Block) - if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { // Time retention is disabled or no blocks to work with. + // Time retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { return } + deleteable = make(map[ulid.ULID]*Block) for i, block := range blocks { - // The difference between the first block and this block is larger than the retention period so - // any blocks after that are added as deleteable. + // The difference between the first block and this block is larger than + // the retention period so any blocks after that are added as deleteable. if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) { for _, b := range blocks[i:] { deleteable[b.meta.ULID] = b @@ -619,17 +617,17 @@ func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Bl break } } - return deleteable } func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { - deleteable = make(map[ulid.ULID]*Block) - if db.opts.MaxBytes <= 0 { // Size retention is disabled. + // Size retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { return } - var blocksSize = int64(0) + deleteable = make(map[ulid.ULID]*Block) + blocksSize := int64(0) for i, block := range blocks { blocksSize += block.Size() if blocksSize > db.opts.MaxBytes { @@ -641,7 +639,6 @@ func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Bl break } } - return deleteable }