Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
ignore corrupted blocks that have been replaced by parents.
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
  • Loading branch information
Krasi Georgiev committed Jan 15, 2019
1 parent 7ca26dd commit 33f2846
Showing 1 changed file with 12 additions and 16 deletions.
28 changes: 12 additions & 16 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,14 +482,22 @@ func (db *DB) reload() (err error) {
if err != nil {
return err
}

deletable := db.deletableBlocks(loadable)

// A corrupted block that is not set for deletion by deletableBlocks() should return an error.
for ulid, err := range corrupted {
if _, ok := deletable[ulid]; !ok {
return errors.Wrap(err, "unexpected corrupted block")
// Corrupted blocks that have been replaced by parents can be safely ignored and deleted.
// This makes it resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically.
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
for _, block := range loadable {
for _, b := range block.Meta().Compaction.Parents {
delete(corrupted, b.ULID)
deletable[b.ULID] = nil
}
}
if len(corrupted) > 0 {
return errors.Wrap(err, "unexpected corrupted block")
}

// All deletable blocks should not be loaded.
var (
Expand Down Expand Up @@ -577,18 +585,6 @@ func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err
func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block {
deletable := make(map[ulid.ULID]*Block)

// Delete old blocks that have been superseded by new ones by gathering their parents.
// Those parents all have newer replacements and
// can be safely deleted after loading the other blocks.
// This makes it resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically.
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
for _, block := range blocks {
for _, b := range block.Meta().Compaction.Parents {
deletable[b.ULID] = nil
}
}

// Sort the blocks by time - newest to oldest (largest to smallest timestamp).
// This ensures that the retentions will remove the oldest blocks.
sort.Slice(blocks, func(i, j int) bool {
Expand Down

0 comments on commit 33f2846

Please sign in to comment.