Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow kPointInTimeRecovery to recover until corrupted write batch #12840

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1263,14 +1263,32 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch",
batch_to_use);

status = WriteBatchInternal::UpdateProtectionInfo(batch_to_use,
/*bytes_per_key=*/8);
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:UpdateProtectionInfo::status", &status);
if (!status.ok()) {
if (status.IsCorruption()) {
reporter.Corruption(record.size(), status);
continue;
} else {
// Fail DB open for non-corruption failure.
return status;
}
}
Comment on lines +1267 to +1279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stared at this for a while without understanding it. Maybe some comment will help: "UpdateProtectionInfo() examines the contents of the WriteBatch to calculate KV checksums. Any corruptions in the WriteBatch will be surfaced during this processing. Corruptions here indicate the WriteBatch we read was corrupted, so we follow the usual convention for reporting a Corruption in the input data. For cases where KV checksum detects a corruption introduced by the recovery process, see VerifyChecksum() below"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is too complicated, but you might want to VerifyChecksum() even after detecting a corruption here to check whether the write batch became ill-formed by recovery corruption (and in that case do not use reporter.Corruption()).

TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
&record_checksum);
status = WriteBatchInternal::UpdateProtectionInfo(
batch_to_use, 8 /* bytes_per_key */,
batch_updated ? nullptr : &record_checksum);
if (!status.ok()) {
return status;
if (!batch_updated) {
// Verify write batch content is not corrupted since read from WAL
status =
WriteBatchInternal::VerifyChecksum(batch_to_use, record_checksum);
// Treat this as DB open failure since likely some in-memory corruption
// happened.
if (!status.ok()) {
return status;
}
}

SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use);
Expand Down
29 changes: 29 additions & 0 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2757,6 +2757,35 @@ TEST_F(DBWALTest, EmptyWalReopenTest) {
}
}

// Tests kPointInTimeRecovery when the first encountered corruption
// during WAL replay is a bad write batch. RocksDB should open successfully
// and recover just before the bad write batch.
TEST_F(DBWALTest, PITWithBadWBContent) {
Options options = CurrentOptions();
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
Reopen(options);

WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, Key(1), "val1"));
SequenceNumber seq = db_->GetLatestSequenceNumber();
ASSERT_OK(db_->Put(wo, Key(1), "val2"));
int count = 1;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RecoverLogFiles:UpdateProtectionInfo::status", [&](void* s) {
if (count == 0) {
Status* status = static_cast<Status*>(s);
*status = Status::Corruption("bad WriteBatch Put");
}
--count;
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
ASSERT_OK(s);
ASSERT_EQ("val1", Get(Key(1)));
ASSERT_EQ(seq, db_->GetLatestSequenceNumber());
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
20 changes: 10 additions & 10 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3359,9 +3359,16 @@ size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
}
}

Status WriteBatchInternal::VerifyChecksum(WriteBatch* wb, uint64_t expected) {
uint64_t actual = XXH3_64bits(wb->rep_.data(), wb->rep_.size());
if (actual != expected) {
return Status::Corruption("Write batch content corrupted.");
}
return Status::OK();
}

Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb,
size_t bytes_per_key,
uint64_t* checksum) {
size_t bytes_per_key) {
if (bytes_per_key == 0) {
if (wb->prot_info_ != nullptr) {
wb->prot_info_.reset();
Expand All @@ -3374,14 +3381,7 @@ Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb,
if (wb->prot_info_ == nullptr) {
wb->prot_info_.reset(new WriteBatch::ProtectionInfo());
ProtectionInfoUpdater prot_info_updater(wb->prot_info_.get());
Status s = wb->Iterate(&prot_info_updater);
if (s.ok() && checksum != nullptr) {
uint64_t expected_hash = XXH3_64bits(wb->rep_.data(), wb->rep_.size());
if (expected_hash != *checksum) {
return Status::Corruption("Write batch content corrupted.");
}
}
return s;
return wb->Iterate(&prot_info_updater);
} else {
// Already protected.
return Status::OK();
Expand Down
5 changes: 3 additions & 2 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ class WriteBatchInternal {

// Update per-key value protection information on this write batch.
// If checksum is provided, the batch content is verfied against the checksum.
static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key,
uint64_t* checksum = nullptr);
static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key);

static Status VerifyChecksum(WriteBatch* wb, uint64_t expected_checksum);
};

// LocalSavePoint is similar to a scope guard
Expand Down
Loading