diff --git a/CMakeLists.txt b/CMakeLists.txt index 2972cdc1fb..066ba228a1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -427,6 +427,16 @@ if(WITH_TBB) list(APPEND THIRDPARTY_LIBS TBB::TBB) endif() +option(WITH_SNAP_OPTIMIZATION "Optimize Snapshot performance for read mostly workload" OFF) +if(WITH_SNAP_OPTIMIZATION) + find_package(folly REQUIRED) + add_definitions(-DSPEEDB_SNAP_OPTIMIZATION) + list(APPEND THIRDPARTY_LIBS folly) + message(STATUS "Enabling RTTI in all builds - part of folly requirements") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI") +endif() + # Stall notifications eat some performance from inserts option(DISABLE_STALL_NOTIF "Build with stall notifications" OFF) if(DISABLE_STALL_NOTIF) @@ -446,6 +456,7 @@ endif() # RTTI is by default AUTO which enables it in debug and disables it in release. +if(NOT WITH_SNAP_OPTIMIZATION) set(USE_RTTI AUTO CACHE STRING "Enable RTTI in builds") set_property(CACHE USE_RTTI PROPERTY STRINGS AUTO ON OFF) if(USE_RTTI STREQUAL "AUTO") @@ -471,6 +482,7 @@ else() set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") endif() endif() +endif() # Used to run CI build and tests so we can run faster option(OPTDBG "Build optimized debug build with MSVC" OFF) diff --git a/HISTORY.md b/HISTORY.md index eb1397df27..ea46cfeabd 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -26,6 +26,8 @@ we still write it with its value to SST file. This feature keeps only the delete record and reduce SST size for later compaction. (#411) +* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations. + ### Enhancements * CI: add a workflow for building and publishing jar to maven central (#507) * LOG: Compaction job traces - report cf name and job id (#511) diff --git a/cmake/modules/FindFolly.cmake b/cmake/modules/FindFolly.cmake new file mode 100644 index 0000000000..9b12b6730f --- /dev/null +++ b/cmake/modules/FindFolly.cmake @@ -0,0 +1,31 @@ +find_path(FOLLY_ROOT_DIR + NAMES include/folly/folly-config.h +) + +find_library(FOLLY_LIBRARIES + NAMES folly + HINTS ${FOLLY_ROOT_DIR}/lib +) + +find_library(FOLLY_BENCHMARK_LIBRARIES + NAMES follybenchmark + HINTS ${FOLLY_ROOT_DIR}/lib +) + +find_path(FOLLY_INCLUDE_DIR + NAMES folly/folly-config.h + HINTS ${FOLLY_ROOT_DIR}/include +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(Folly DEFAULT_MSG + FOLLY_LIBRARIES + FOLLY_INCLUDE_DIR +) + +mark_as_advanced( + FOLLY_ROOT_DIR + FOLLY_LIBRARIES + FOLLY_BENCHMARK_LIBRARIES + FOLLY_INCLUDE_DIR +) \ No newline at end of file diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f85d800cb8..5bc218d4a1 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3714,10 +3714,16 @@ Status DBImpl::GetTimestampedSnapshots( SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, bool lock) { + if (!is_snapshot_supported_) { + return nullptr; + } int64_t unix_time = 0; - immutable_db_options_.clock->GetCurrentTime(&unix_time) - .PermitUncheckedError(); // Ignore error SnapshotImpl* s = new SnapshotImpl; +#ifdef SPEEDB_SNAP_OPTIMIZATION + if (RefSnapshot(unix_time, is_write_conflict_boundary, s)) { + return s; + } +#endif if (lock) { mutex_.Lock(); @@ -3732,6 +3738,8 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, delete s; return nullptr; } + immutable_db_options_.clock->GetCurrentTime(&unix_time) + .PermitUncheckedError(); // Ignore error auto snapshot_seq = GetLastPublishedSequence(); SnapshotImpl* snapshot = snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); @@ -3862,6 +3870,47 @@ bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) { } } // namespace +#ifdef SPEEDB_SNAP_OPTIMIZATION +bool DBImpl::UnRefSnapshot(const SnapshotImpl* snapshot, + bool& is_cached_snapshot) { + SnapshotImpl* snap = const_cast(snapshot); + if (snap->cached_snapshot) { + snapshots_.logical_count_.fetch_sub(1); + is_cached_snapshot = true; + size_t cnt = snap->cached_snapshot->refcount.fetch_sub(1); + if (cnt < 2) { + snapshots_.last_snapshot_.compare_exchange_weak(snap->cached_snapshot, + nullptr); + } + delete snap; + } + if (!snapshots_.deleteitem && is_cached_snapshot) { + return true; + } + return false; +} + +bool DBImpl::RefSnapshot(int64_t unix_time, bool is_write_conflict_boundary, + SnapshotImpl* snapshot) { + std::shared_ptr shared_snap = snapshots_.last_snapshot_; + if (shared_snap && + shared_snap->GetSequenceNumber() == GetLastPublishedSequence() && + shared_snap->is_write_conflict_boundary_ == is_write_conflict_boundary) { + immutable_db_options_.clock->GetCurrentTime(&unix_time) + .PermitUncheckedError(); // Ignore error + snapshot->cached_snapshot = shared_snap; + snapshots_.logical_count_.fetch_add(1); + shared_snap->refcount.fetch_add(1); + snapshot->number_ = shared_snap->GetSequenceNumber(); + snapshot->unix_time_ = unix_time; + snapshot->is_write_conflict_boundary_ = is_write_conflict_boundary; + return true; + } + return false; +} + +#endif + void DBImpl::ReleaseSnapshot(const Snapshot* s) { if (s == nullptr) { // DBImpl::GetSnapshot() can return nullptr when snapshot @@ -3870,9 +3919,24 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { return; } const SnapshotImpl* casted_s = reinterpret_cast(s); +#ifdef SPEEDB_SNAP_OPTIMIZATION + bool is_cached_snapshot = false; + if (UnRefSnapshot(casted_s, is_cached_snapshot)) { + return; + } +#endif { InstrumentedMutexLock l(&mutex_); +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::scoped_lock snaplock(snapshots_.lock); + snapshots_.deleteitem = false; + if (!is_cached_snapshot) { + snapshots_.Delete(casted_s); + delete casted_s; + } +#else snapshots_.Delete(casted_s); +#endif uint64_t oldest_snapshot; if (snapshots_.empty()) { oldest_snapshot = GetLastPublishedSequence(); @@ -3913,7 +3977,9 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold; } } +#ifndef SPEEDB_SNAP_OPTIMIZATION delete casted_s; +#endif } Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index d7c1f2e4a7..b4618127b9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -345,6 +345,12 @@ class DBImpl : public DB { std::vector* iterators) override; virtual const Snapshot* GetSnapshot() override; + // Will unref a snapshot copy + // Returns true if the snapshot has not been deleted from SnapshotList + bool UnRefSnapshot(const SnapshotImpl* snapshot, bool& is_cached_snapshot); + // true if the snapshot provided has been referenced, otherwise false + bool RefSnapshot(int64_t unix_time, bool is_write_conflict_boundary, + SnapshotImpl* snapshot); virtual void ReleaseSnapshot(const Snapshot* snapshot) override; // Create a timestamped snapshot. This snapshot can be shared by multiple // readers. If any of them uses it for write conflict checking, then diff --git a/db/db_test2.cc b/db/db_test2.cc index 974d55bd18..725f19e8ed 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2048,6 +2048,46 @@ TEST_F(DBTest2, DuplicateSnapshot) { } } +#ifdef SPEEDB_SNAP_OPTIMIZATION +// This test should run only if there is snapshot optimization enabled +TEST_F(DBTest2, RefSnapshot) { + Options options; + options = CurrentOptions(options); + std::vector snapshots; + DBImpl* dbi = static_cast_with_check(db_); + SequenceNumber oldest_ww_snap, first_ww_snap; + + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(db_->GetSnapshot()); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary()); + first_ww_snap = snapshots.back()->GetSequenceNumber(); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary()); + snapshots.push_back(db_->GetSnapshot()); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(db_->GetSnapshot()); // this should create a reference + + { + InstrumentedMutexLock l(dbi->mutex()); + auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap); + ASSERT_EQ(seqs.size(), 4); // duplicates are not counted + ASSERT_EQ(oldest_ww_snap, first_ww_snap); + ASSERT_EQ(dbi->snapshots().count(), + 6); // how many snapshots stored in SnapshotList + ASSERT_EQ(dbi->snapshots().logical_count(), + 8); // how many snapshots in the system + } + + for (auto s : snapshots) { + db_->ReleaseSnapshot(s); + } +} +#endif + class PinL0IndexAndFilterBlocksTest : public DBTestBase, public testing::WithParamInterface> { diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 23e5e98cd2..631386cfcb 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -12,7 +12,13 @@ #include "db/dbformat.h" #include "rocksdb/db.h" +#include "rocksdb/types.h" #include "util/autovector.h" +///* will enable if the performance tests will require it +#include "folly/concurrency/AtomicSharedPtr.h" +//*/ +#include +#include namespace ROCKSDB_NAMESPACE { @@ -22,17 +28,28 @@ class SnapshotList; // Each SnapshotImpl corresponds to a particular sequence number. class SnapshotImpl : public Snapshot { public: +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::atomic_uint64_t refcount = {1}; + std::shared_ptr cached_snapshot = nullptr; + + struct Deleter { + inline void operator()(SnapshotImpl* snap) const; + }; + int64_t unix_time_; + uint64_t timestamp_; + // Will this snapshot be used by a Transaction to do write-conflict checking? + bool is_write_conflict_boundary_; +#endif SequenceNumber number_; // const after creation // It indicates the smallest uncommitted data at the time the snapshot was // taken. This is currently used by WritePrepared transactions to limit the // scope of queries to IsInSnapshot. SequenceNumber min_uncommitted_ = kMinUnCommittedSeq; - SequenceNumber GetSequenceNumber() const override { return number_; } - int64_t GetUnixTime() const override { return unix_time_; } uint64_t GetTimestamp() const override { return timestamp_; } + SequenceNumber GetSequenceNumber() const override { return number_; } private: friend class SnapshotList; @@ -42,17 +59,25 @@ class SnapshotImpl : public Snapshot { SnapshotImpl* next_; SnapshotList* list_; // just for sanity checks - +#ifndef SPEEDB_SNAP_OPTIMIZATION int64_t unix_time_; uint64_t timestamp_; // Will this snapshot be used by a Transaction to do write-conflict checking? bool is_write_conflict_boundary_; +#endif }; class SnapshotList { public: +#ifdef SPEEDB_SNAP_OPTIMIZATION + mutable std::mutex lock; + bool deleteitem = false; + ///* If the folly::atomic_shared_ptr will provide significant performance gain + // it will be considered as a solution + folly::atomic_shared_ptr last_snapshot_; +#endif SnapshotList() { list_.prev_ = &list_; list_.next_ = &list_; @@ -63,6 +88,9 @@ class SnapshotList { list_.timestamp_ = 0; list_.is_write_conflict_boundary_ = false; count_ = 0; +#ifdef SPEEDB_SNAP_OPTIMIZATION + last_snapshot_ = nullptr; +#endif } // No copy-construct. @@ -81,9 +109,29 @@ class SnapshotList { return list_.prev_; } +#ifdef SPEEDB_SNAP_OPTIMIZATION + SnapshotImpl* NewSnapRef(SequenceNumber seq, uint64_t unix_time, + bool is_write_conflict_boundary, + uint64_t ts = std::numeric_limits::max()) { + // user snapshot is a reference to the snapshot inside the SnapshotList + // Unfortunatly right now the snapshot api cannot return shared_ptr to the + // user so a deep copy should be created + SnapshotImpl* user_snapshot = new SnapshotImpl; + user_snapshot->unix_time_ = unix_time; + user_snapshot->is_write_conflict_boundary_ = is_write_conflict_boundary; + user_snapshot->number_ = seq; + user_snapshot->timestamp_ = ts; + return user_snapshot; + } +#endif + SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time, bool is_write_conflict_boundary, uint64_t ts = std::numeric_limits::max()) { +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::unique_lock l(lock); + logical_count_.fetch_add(1); +#endif s->number_ = seq; s->unix_time_ = unix_time; s->timestamp_ = ts; @@ -94,11 +142,25 @@ class SnapshotList { s->prev_->next_ = s; s->next_->prev_ = s; count_++; +#ifdef SPEEDB_SNAP_OPTIMIZATION + SnapshotImpl* snap = + NewSnapRef(seq, unix_time, is_write_conflict_boundary, ts); + auto new_last_snapshot = + std::shared_ptr(s, SnapshotImpl::Deleter{}); + // may call Deleter + l.unlock(); + last_snapshot_ = new_last_snapshot; + snap->cached_snapshot = last_snapshot_; + return snap; +#endif return s; } // Do not responsible to free the object. void Delete(const SnapshotImpl* s) { +#ifdef WITH_SNAP_OPTIMIZATION + std::unique_lock l(lock); +#endif assert(s->list_ == this); s->prev_->next_ = s->next_; s->next_->prev_ = s->prev_; @@ -118,6 +180,9 @@ class SnapshotList { void GetAll(std::vector* snap_vector, SequenceNumber* oldest_write_conflict_snapshot = nullptr, const SequenceNumber& max_seq = kMaxSequenceNumber) const { +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::scoped_lock l(lock); +#endif std::vector& ret = *snap_vector; // So far we have no use case that would pass a non-empty vector assert(ret.size() == 0); @@ -177,11 +242,19 @@ class SnapshotList { } uint64_t count() const { return count_; } +#ifdef SPEEDB_SNAP_OPTIMIZATION + // changing count_ always under snapshot_list mutex + uint64_t count_; + uint64_t logical_count() const { return logical_count_; } + std::atomic_uint64_t logical_count_; +#endif private: // Dummy head of doubly-linked list of snapshots SnapshotImpl list_; +#ifndef SPEEDB_SNAP_OPTIMIZATION uint64_t count_; +#endif }; // All operations on TimestampedSnapshotList must be protected by db mutex. @@ -235,5 +308,16 @@ class TimestampedSnapshotList { private: std::map> snapshots_; }; - +#ifdef SPEEDB_SNAP_OPTIMIZATION +inline void SnapshotImpl::Deleter::operator()(SnapshotImpl* snap) const { + if (snap->cached_snapshot == nullptr) { + std::scoped_lock l(snap->list_->lock); + snap->prev_->next_ = snap->next_; + snap->next_->prev_ = snap->prev_; + snap->list_->deleteitem = true; + snap->list_->count_--; + } + delete snap; +} +#endif } // namespace ROCKSDB_NAMESPACE