Skip to content

Commit

Permalink
Add a TransactionOptions to enable tracking timestamp size info insid…
Browse files Browse the repository at this point in the history
…e WriteBatch (#12864)

Summary:
In normal use cases, meta info like column family's timestamp size is tracked at the transaction layer, so it's not necessary and even detrimental to track such info inside the internal WriteBatch because it may let anti-patterns like bypassing Transaction write APIs and directly write to its internal WriteBatch like this:
https://github.com/facebook/mysql-5.6/blob/9d0a754dc9973af0508b3ba260fc337190a3218f/storage/rocksdb/ha_rocksdb.cc#L4949-L4950
Setting this option to true will keep aforementioned use case continue to work before it's refactored out. This option is only for this purpose and it will be gradually deprecated after aforementioned MyRocks use case are refactored.

Pull Request resolved: #12864

Test Plan: Added unit tests

Reviewed By: cbi42

Differential Revision: D60194094

Pulled By: jowlyzhang

fbshipit-source-id: 64a98822167e99aa7e4fa2a60085d44a5deaa45c
  • Loading branch information
jowlyzhang committed Aug 5, 2024
1 parent adee067 commit aee826f
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 76 deletions.
188 changes: 124 additions & 64 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -928,15 +928,19 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
}

if (0 == ts_sz) {
return WriteBatchInternal::Put(this, cf_id, key, value);
s = WriteBatchInternal::Put(this, cf_id, key, value);
} else {
needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
s = WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
SliceParts(&value, 1));
}

needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
SliceParts(&value, 1));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

Status WriteBatch::TimedPut(ColumnFamilyHandle* column_family, const Slice& key,
Expand All @@ -961,16 +965,20 @@ Status WriteBatch::TimedPut(ColumnFamilyHandle* column_family, const Slice& key,

Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts, const Slice& value) {
const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
Status s = CheckColumnFamilyTimestampSize(column_family, ts);
if (!s.ok()) {
return s;
}
has_key_with_ts_ = true;
assert(column_family);
uint32_t cf_id = column_family->GetID();
std::array<Slice, 2> key_with_ts{{key, ts}};
return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
SliceParts(&value, 1));
s = WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
SliceParts(&value, 1));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts.size());
}
return s;
}

Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
Expand Down Expand Up @@ -1038,7 +1046,11 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
}

if (ts_sz == 0) {
return WriteBatchInternal::Put(this, cf_id, key, value);
s = WriteBatchInternal::Put(this, cf_id, key, value);
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

return Status::InvalidArgument(
Expand Down Expand Up @@ -1245,29 +1257,37 @@ Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
}

if (0 == ts_sz) {
return WriteBatchInternal::Delete(this, cf_id, key);
s = WriteBatchInternal::Delete(this, cf_id, key);
} else {
needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
s = WriteBatchInternal::Delete(this, cf_id,
SliceParts(key_with_ts.data(), 2));
}

needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
return WriteBatchInternal::Delete(this, cf_id,
SliceParts(key_with_ts.data(), 2));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts) {
const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
Status s = CheckColumnFamilyTimestampSize(column_family, ts);
if (!s.ok()) {
return s;
}
assert(column_family);
has_key_with_ts_ = true;
uint32_t cf_id = column_family->GetID();
std::array<Slice, 2> key_with_ts{{key, ts}};
return WriteBatchInternal::Delete(this, cf_id,
SliceParts(key_with_ts.data(), 2));
s = WriteBatchInternal::Delete(this, cf_id,
SliceParts(key_with_ts.data(), 2));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts.size());
}
return s;
}

Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
Expand Down Expand Up @@ -1312,7 +1332,11 @@ Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
}

if (0 == ts_sz) {
return WriteBatchInternal::Delete(this, cf_id, key);
s = WriteBatchInternal::Delete(this, cf_id, key);
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

return Status::InvalidArgument(
Expand Down Expand Up @@ -1360,29 +1384,37 @@ Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
}

if (0 == ts_sz) {
return WriteBatchInternal::SingleDelete(this, cf_id, key);
s = WriteBatchInternal::SingleDelete(this, cf_id, key);
} else {
needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
s = WriteBatchInternal::SingleDelete(this, cf_id,
SliceParts(key_with_ts.data(), 2));
}

needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
return WriteBatchInternal::SingleDelete(this, cf_id,
SliceParts(key_with_ts.data(), 2));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts) {
const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
Status s = CheckColumnFamilyTimestampSize(column_family, ts);
if (!s.ok()) {
return s;
}
has_key_with_ts_ = true;
assert(column_family);
uint32_t cf_id = column_family->GetID();
std::array<Slice, 2> key_with_ts{{key, ts}};
return WriteBatchInternal::SingleDelete(this, cf_id,
SliceParts(key_with_ts.data(), 2));
s = WriteBatchInternal::SingleDelete(this, cf_id,
SliceParts(key_with_ts.data(), 2));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts.size());
}
return s;
}

Status WriteBatchInternal::SingleDelete(WriteBatch* b,
Expand Down Expand Up @@ -1429,7 +1461,11 @@ Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
}

if (0 == ts_sz) {
return WriteBatchInternal::SingleDelete(this, cf_id, key);
s = WriteBatchInternal::SingleDelete(this, cf_id, key);
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

return Status::InvalidArgument(
Expand Down Expand Up @@ -1479,23 +1515,27 @@ Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
}

if (0 == ts_sz) {
return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
s = WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
} else {
needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> begin_key_with_ts{{begin_key, dummy_ts}};
std::array<Slice, 2> end_key_with_ts{{end_key, dummy_ts}};
s = WriteBatchInternal::DeleteRange(this, cf_id,
SliceParts(begin_key_with_ts.data(), 2),
SliceParts(end_key_with_ts.data(), 2));
}

needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> begin_key_with_ts{{begin_key, dummy_ts}};
std::array<Slice, 2> end_key_with_ts{{end_key, dummy_ts}};
return WriteBatchInternal::DeleteRange(
this, cf_id, SliceParts(begin_key_with_ts.data(), 2),
SliceParts(end_key_with_ts.data(), 2));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key,
const Slice& ts) {
const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
Status s = CheckColumnFamilyTimestampSize(column_family, ts);
if (!s.ok()) {
return s;
}
Expand All @@ -1504,9 +1544,13 @@ Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
uint32_t cf_id = column_family->GetID();
std::array<Slice, 2> key_with_ts{{begin_key, ts}};
std::array<Slice, 2> end_key_with_ts{{end_key, ts}};
return WriteBatchInternal::DeleteRange(this, cf_id,
SliceParts(key_with_ts.data(), 2),
SliceParts(end_key_with_ts.data(), 2));
s = WriteBatchInternal::DeleteRange(this, cf_id,
SliceParts(key_with_ts.data(), 2),
SliceParts(end_key_with_ts.data(), 2));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts.size());
}
return s;
}

Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
Expand Down Expand Up @@ -1553,7 +1597,11 @@ Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
}

if (0 == ts_sz) {
return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
s = WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

return Status::InvalidArgument(
Expand Down Expand Up @@ -1607,30 +1655,38 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
}

if (0 == ts_sz) {
return WriteBatchInternal::Merge(this, cf_id, key, value);
}

needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
s = WriteBatchInternal::Merge(this, cf_id, key, value);
} else {
needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};

return WriteBatchInternal::Merge(
this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1));
s = WriteBatchInternal::Merge(
this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1));
}
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts, const Slice& value) {
const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
Status s = CheckColumnFamilyTimestampSize(column_family, ts);
if (!s.ok()) {
return s;
}
has_key_with_ts_ = true;
assert(column_family);
uint32_t cf_id = column_family->GetID();
std::array<Slice, 2> key_with_ts{{key, ts}};
return WriteBatchInternal::Merge(
this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1));
s = WriteBatchInternal::Merge(this, cf_id, SliceParts(key_with_ts.data(), 2),
SliceParts(&value, 1));
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts.size());
}
return s;
}

Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
Expand Down Expand Up @@ -1679,7 +1735,11 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
}

if (0 == ts_sz) {
return WriteBatchInternal::Merge(this, cf_id, key, value);
s = WriteBatchInternal::Merge(this, cf_id, key, value);
if (s.ok()) {
MaybeTrackTimestampSize(cf_id, ts_sz);
}
return s;
}

return Status::InvalidArgument(
Expand Down
16 changes: 16 additions & 0 deletions include/rocksdb/utilities/transaction_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,22 @@ struct TransactionOptions {
// description. If a negative value is specified, then the default value from
// TransactionDBOptions is used.
int64_t write_batch_flush_threshold = -1;

// DO NOT USE.
// This is only a temporary option dedicated for MyRocks that will soon be
// removed.
// In normal use cases, meta info like column family's timestamp size is
// tracked at the transaction layer, so it's not necessary and even
// detrimental to track such info inside the internal WriteBatch because it
// may let anti-patterns like bypassing Transaction write APIs and directly
// write to its internal `WriteBatch` retrieved like this:
// https://github.com/facebook/mysql-5.6/blob/fb-mysql-8.0.32/storage/rocksdb/ha_rocksdb.cc#L4949-L4950
// Setting this option to true will keep aforementioned use case continue to
// work before it's refactored out.
// When this flag is enabled, we also intentionally only track the timestamp
// size in APIs that MyRocks currently are using, including Put, Merge, Delete
// DeleteRange, SingleDelete.
bool write_batch_track_timestamp_size = false;
};

// The per-write optimizations that do not involve transactions. TransactionDB
Expand Down
28 changes: 28 additions & 0 deletions include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,30 @@ class WriteBatch : public WriteBatchBase {
Status UpdateTimestamps(const Slice& ts,
std::function<size_t(uint32_t /*cf*/)> ts_sz_func);

// TODO: remove these internal APIs after MyRocks refactor to not directly
// write to a `WriteBatch` retrieved from `Transaction` via
// `Transaction::GetWriteBatch`.

void SetTrackTimestampSize(bool track_timestamp_size) {
track_timestamp_size_ = track_timestamp_size;
}

inline void MaybeTrackTimestampSize(uint32_t column_family_id, size_t ts_sz) {
if (!track_timestamp_size_) {
return;
}
auto iter = cf_id_to_ts_sz_.find(column_family_id);
if (iter == cf_id_to_ts_sz_.end()) {
cf_id_to_ts_sz_.emplace(column_family_id, ts_sz);
}
}

// Return a mapping from column family id to timestamp size of all the column
// families involved in this WriteBatch.
const std::unordered_map<uint32_t, size_t>& GetColumnFamilyToTimestampSize() {
return cf_id_to_ts_sz_;
}

// Verify the per-key-value checksums of this write batch.
// Corruption status will be returned if the verification fails.
// If this write batch does not have per-key-value checksum,
Expand Down Expand Up @@ -511,6 +535,10 @@ class WriteBatch : public WriteBatchBase {

size_t default_cf_ts_sz_ = 0;

bool track_timestamp_size_ = false;

std::unordered_map<uint32_t, size_t> cf_id_to_ts_sz_;

protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_
};
Expand Down
Loading

0 comments on commit aee826f

Please sign in to comment.