From d30509c64bcc43a11206d15fc334caf3365d51ef Mon Sep 17 00:00:00 2001 From: Or Friedmann Date: Thu, 18 May 2023 10:53:00 +0300 Subject: [PATCH] Snapshot Optimization (https://github.com/speedb-io/speedb/issues/35) Motivation: The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations. This Feature must have folly library installed. In order to cache the snapshots, there is last_snapshot_ (folly::atomic_shared_ptr, lock free atomic_shared_ptr) in order to access the last_snapshot_ created and point to it. For every GetSnapshotImpl call (where snapshots are being created), the function checks if the sequence number is different than last_snapshot_, if no, it creates new snapshot and inside this snapshot it adds a reference to last_snapshot_ (the reference is cached_snapshot), so this sequence number will remain inside SnapshotList (SnapshotList is the list of the snapshots in the system and used in compaction to show which snapshots are being used), if there are still snapshots holding this sequence number. If the sequence number as changed or the last_snapshot_ is nullptr it will create the snapshot while acquiring db_mutex. For ReleaseSnapshotImpl (deleting a snapshot). We will unref the last_snapshot_ (using comapre_exchange_weak) and if the refcount becomes 0, it will call Deleter and remove this snapshot entirely from the SnapshotList and continue with taking the db mutex. If there are still references, it will return without taking it out from the SnapshotList nor taking the db mutex --- CMakeLists.txt | 12 +++ HISTORY.md | 2 + cmake/modules/FindFolly.cmake | 31 ++++++++ db/db_impl/db_impl.cc | 37 +++++---- db/db_impl/db_impl.h | 6 ++ db/db_test2.cc | 40 ++++++++++ db/snapshot_impl.h | 139 +++++++++++++++++++++++++++++++--- 7 files changed, 235 insertions(+), 32 deletions(-) create mode 100644 cmake/modules/FindFolly.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index 2972cdc1fb..066ba228a1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -427,6 +427,16 @@ if(WITH_TBB) list(APPEND THIRDPARTY_LIBS TBB::TBB) endif() +option(WITH_SNAP_OPTIMIZATION "Optimize Snapshot performance for read mostly workload" OFF) +if(WITH_SNAP_OPTIMIZATION) + find_package(folly REQUIRED) + add_definitions(-DSPEEDB_SNAP_OPTIMIZATION) + list(APPEND THIRDPARTY_LIBS folly) + message(STATUS "Enabling RTTI in all builds - part of folly requirements") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI") +endif() + # Stall notifications eat some performance from inserts option(DISABLE_STALL_NOTIF "Build with stall notifications" OFF) if(DISABLE_STALL_NOTIF) @@ -446,6 +456,7 @@ endif() # RTTI is by default AUTO which enables it in debug and disables it in release. +if(NOT WITH_SNAP_OPTIMIZATION) set(USE_RTTI AUTO CACHE STRING "Enable RTTI in builds") set_property(CACHE USE_RTTI PROPERTY STRINGS AUTO ON OFF) if(USE_RTTI STREQUAL "AUTO") @@ -471,6 +482,7 @@ else() set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") endif() endif() +endif() # Used to run CI build and tests so we can run faster option(OPTDBG "Build optimized debug build with MSVC" OFF) diff --git a/HISTORY.md b/HISTORY.md index eb1397df27..ea46cfeabd 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -26,6 +26,8 @@ we still write it with its value to SST file. This feature keeps only the delete record and reduce SST size for later compaction. (#411) +* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations. + ### Enhancements * CI: add a workflow for building and publishing jar to maven central (#507) * LOG: Compaction job traces - report cf name and job id (#511) diff --git a/cmake/modules/FindFolly.cmake b/cmake/modules/FindFolly.cmake new file mode 100644 index 0000000000..9b12b6730f --- /dev/null +++ b/cmake/modules/FindFolly.cmake @@ -0,0 +1,31 @@ +find_path(FOLLY_ROOT_DIR + NAMES include/folly/folly-config.h +) + +find_library(FOLLY_LIBRARIES + NAMES folly + HINTS ${FOLLY_ROOT_DIR}/lib +) + +find_library(FOLLY_BENCHMARK_LIBRARIES + NAMES follybenchmark + HINTS ${FOLLY_ROOT_DIR}/lib +) + +find_path(FOLLY_INCLUDE_DIR + NAMES folly/folly-config.h + HINTS ${FOLLY_ROOT_DIR}/include +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(Folly DEFAULT_MSG + FOLLY_LIBRARIES + FOLLY_INCLUDE_DIR +) + +mark_as_advanced( + FOLLY_ROOT_DIR + FOLLY_LIBRARIES + FOLLY_BENCHMARK_LIBRARIES + FOLLY_INCLUDE_DIR +) \ No newline at end of file diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f85d800cb8..ff0f0b79a8 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3714,27 +3714,22 @@ Status DBImpl::GetTimestampedSnapshots( SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, bool lock) { - int64_t unix_time = 0; - immutable_db_options_.clock->GetCurrentTime(&unix_time) - .PermitUncheckedError(); // Ignore error - SnapshotImpl* s = new SnapshotImpl; + if (!is_snapshot_supported_) { + return nullptr; + } + SnapshotImpl* snapshot = snapshots_.RefSnapshot( + is_write_conflict_boundary, GetLastPublishedSequence(), GetSystemClock()); + if (snapshot) { + return snapshot; + } if (lock) { mutex_.Lock(); } else { mutex_.AssertHeld(); } - // returns null if the underlying memtable does not support snapshot. - if (!is_snapshot_supported_) { - if (lock) { - mutex_.Unlock(); - } - delete s; - return nullptr; - } - auto snapshot_seq = GetLastPublishedSequence(); - SnapshotImpl* snapshot = - snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); + snapshot = + snapshots_.New(GetLastPublishedSequence(), GetSystemClock(), is_write_conflict_boundary); if (lock) { mutex_.Unlock(); } @@ -3747,7 +3742,6 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, int64_t unix_time = 0; immutable_db_options_.clock->GetCurrentTime(&unix_time) .PermitUncheckedError(); // Ignore error - SnapshotImpl* s = new SnapshotImpl; const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber); @@ -3761,7 +3755,6 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, if (lock) { mutex_.Unlock(); } - delete s; return std::make_pair( Status::NotSupported("Memtable does not support snapshot"), nullptr); } @@ -3815,7 +3808,6 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, if (lock) { mutex_.Unlock(); } - delete s; return std::make_pair(status, ret); } else { status.PermitUncheckedError(); @@ -3823,7 +3815,7 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, } SnapshotImpl* snapshot = - snapshots_.New(s, snapshot_seq, unix_time, + snapshots_.New(snapshot_seq, GetSystemClock(), /*is_write_conflict_boundary=*/true, ts); std::shared_ptr ret( @@ -3862,6 +3854,7 @@ bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) { } } // namespace + void DBImpl::ReleaseSnapshot(const Snapshot* s) { if (s == nullptr) { // DBImpl::GetSnapshot() can return nullptr when snapshot @@ -3870,9 +3863,14 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { return; } const SnapshotImpl* casted_s = reinterpret_cast(s); + if (snapshots_.UnRefSnapshot(casted_s)) { + return; + } { InstrumentedMutexLock l(&mutex_); snapshots_.Delete(casted_s); + std::unique_lock snapshotlist_lock(snapshots_.lock); + casted_s = nullptr; uint64_t oldest_snapshot; if (snapshots_.empty()) { oldest_snapshot = GetLastPublishedSequence(); @@ -3913,7 +3911,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold; } } - delete casted_s; } Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index d7c1f2e4a7..069f4e1931 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -345,6 +345,12 @@ class DBImpl : public DB { std::vector* iterators) override; virtual const Snapshot* GetSnapshot() override; + // Will unref a snapshot copy + // Returns true if the snapshot has not been deleted from SnapshotList + bool UnRefSnapshot(const SnapshotImpl* snapshot, bool& is_cached_snapshot); + // true if the snapshot provided has been referenced, otherwise false + bool RefSnapshot(bool is_write_conflict_boundary, + SnapshotImpl* snapshot); virtual void ReleaseSnapshot(const Snapshot* snapshot) override; // Create a timestamped snapshot. This snapshot can be shared by multiple // readers. If any of them uses it for write conflict checking, then diff --git a/db/db_test2.cc b/db/db_test2.cc index 974d55bd18..725f19e8ed 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2048,6 +2048,46 @@ TEST_F(DBTest2, DuplicateSnapshot) { } } +#ifdef SPEEDB_SNAP_OPTIMIZATION +// This test should run only if there is snapshot optimization enabled +TEST_F(DBTest2, RefSnapshot) { + Options options; + options = CurrentOptions(options); + std::vector snapshots; + DBImpl* dbi = static_cast_with_check(db_); + SequenceNumber oldest_ww_snap, first_ww_snap; + + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(db_->GetSnapshot()); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary()); + first_ww_snap = snapshots.back()->GetSequenceNumber(); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary()); + snapshots.push_back(db_->GetSnapshot()); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(db_->GetSnapshot()); // this should create a reference + + { + InstrumentedMutexLock l(dbi->mutex()); + auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap); + ASSERT_EQ(seqs.size(), 4); // duplicates are not counted + ASSERT_EQ(oldest_ww_snap, first_ww_snap); + ASSERT_EQ(dbi->snapshots().count(), + 6); // how many snapshots stored in SnapshotList + ASSERT_EQ(dbi->snapshots().logical_count(), + 8); // how many snapshots in the system + } + + for (auto s : snapshots) { + db_->ReleaseSnapshot(s); + } +} +#endif + class PinL0IndexAndFilterBlocksTest : public DBTestBase, public testing::WithParamInterface> { diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 23e5e98cd2..787ff3976b 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -8,10 +8,13 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include "db/dbformat.h" +#include "folly/concurrency/AtomicSharedPtr.h" #include "rocksdb/db.h" +#include "rocksdb/types.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -22,17 +25,30 @@ class SnapshotList; // Each SnapshotImpl corresponds to a particular sequence number. class SnapshotImpl : public Snapshot { public: + int64_t unix_time_; + uint64_t timestamp_; + // Will this snapshot be used by a Transaction to do write-conflict checking? + bool is_write_conflict_boundary_; + +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::atomic_uint64_t refcount = {1}; + std::shared_ptr cached_snapshot = nullptr; + + struct Deleter { + inline void operator()(SnapshotImpl* snap) const; + }; + // Will this snapshot be used by a Transaction to do write-conflict checking? +#endif SequenceNumber number_; // const after creation // It indicates the smallest uncommitted data at the time the snapshot was // taken. This is currently used by WritePrepared transactions to limit the // scope of queries to IsInSnapshot. SequenceNumber min_uncommitted_ = kMinUnCommittedSeq; - SequenceNumber GetSequenceNumber() const override { return number_; } - int64_t GetUnixTime() const override { return unix_time_; } uint64_t GetTimestamp() const override { return timestamp_; } + SequenceNumber GetSequenceNumber() const override { return number_; } private: friend class SnapshotList; @@ -43,16 +59,15 @@ class SnapshotImpl : public Snapshot { SnapshotList* list_; // just for sanity checks - int64_t unix_time_; - - uint64_t timestamp_; - - // Will this snapshot be used by a Transaction to do write-conflict checking? - bool is_write_conflict_boundary_; }; class SnapshotList { public: + mutable std::mutex lock; +#ifdef SPEEDB_SNAP_OPTIMIZATION + bool deleteitem = false; + folly::atomic_shared_ptr last_snapshot_; +#endif SnapshotList() { list_.prev_ = &list_; list_.next_ = &list_; @@ -63,6 +78,30 @@ class SnapshotList { list_.timestamp_ = 0; list_.is_write_conflict_boundary_ = false; count_ = 0; +#ifdef SPEEDB_SNAP_OPTIMIZATION + last_snapshot_ = nullptr; +#endif + } + SnapshotImpl* RefSnapshot(bool is_write_conflict_boundary, + SequenceNumber last_pub, SystemClock* clock) { +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::shared_ptr shared_snap = last_snapshot_; + if (shared_snap && shared_snap->GetSequenceNumber() == last_pub && + shared_snap->is_write_conflict_boundary_ == + is_write_conflict_boundary) { + SnapshotImpl* snapshot = new SnapshotImpl; + int64_t unix_time; + clock->GetCurrentTime(&unix_time).PermitUncheckedError(); // Ignore error + snapshot->cached_snapshot = shared_snap; + logical_count_.fetch_add(1); + shared_snap->refcount.fetch_add(1); + snapshot->number_ = shared_snap->GetSequenceNumber(); + snapshot->unix_time_ = unix_time; + snapshot->is_write_conflict_boundary_ = is_write_conflict_boundary; + return snapshot; + } +#endif + return nullptr; } // No copy-construct. @@ -81,9 +120,55 @@ class SnapshotList { return list_.prev_; } - SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time, +#ifdef SPEEDB_SNAP_OPTIMIZATION + SnapshotImpl* NewSnapRef(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time, + bool is_write_conflict_boundary, + uint64_t ts = std::numeric_limits::max()) { + // user snapshot is a reference to the snapshot inside the SnapshotList + // Unfortunatly right now the snapshot api cannot return shared_ptr to the + // user so a deep copy should be created + // s is the original snapshot that is being stored in the SnapshotList + SnapshotImpl* user_snapshot = new SnapshotImpl; + user_snapshot->unix_time_ = unix_time; + user_snapshot->is_write_conflict_boundary_ = is_write_conflict_boundary; + user_snapshot->number_ = seq; + user_snapshot->timestamp_ = ts; + auto new_last_snapshot = + std::shared_ptr(s, SnapshotImpl::Deleter{}); + // may call Deleter + last_snapshot_ = new_last_snapshot; + user_snapshot->cached_snapshot = last_snapshot_; + return user_snapshot; + } +#endif +bool UnRefSnapshot(const SnapshotImpl* snapshot) { + #ifdef SPEEDB_SNAP_OPTIMIZATION + SnapshotImpl* snap = const_cast(snapshot); + logical_count_.fetch_sub(1); + size_t cnt = snap->cached_snapshot->refcount.fetch_sub(1); + if (cnt < 2) { + last_snapshot_.compare_exchange_weak(snap->cached_snapshot, + nullptr); + } + delete snap; + if (!deleteitem) { + // item has not been deleted from SnapshotList + return true; + } + #endif + return false; +} + + SnapshotImpl* New(SequenceNumber seq, SystemClock* clock, bool is_write_conflict_boundary, uint64_t ts = std::numeric_limits::max()) { + SnapshotImpl* s = new SnapshotImpl; +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::unique_lock l(lock); + logical_count_.fetch_add(1); +#endif + int64_t unix_time; + clock->GetCurrentTime(&unix_time).PermitUncheckedError(); // Ignore error s->number_ = seq; s->unix_time_ = unix_time; s->timestamp_ = ts; @@ -94,15 +179,25 @@ class SnapshotList { s->prev_->next_ = s; s->next_->prev_ = s; count_++; +#ifdef SPEEDB_SNAP_OPTIMIZATION + l.unlock(); + return NewSnapRef(s, seq, unix_time, is_write_conflict_boundary, ts); +#endif return s; } // Do not responsible to free the object. void Delete(const SnapshotImpl* s) { +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::unique_lock l(lock); + deleteitem = false; +#else assert(s->list_ == this); + count_--; s->prev_->next_ = s->next_; s->next_->prev_ = s->prev_; - count_--; + delete s; +#endif } // retrieve all snapshot numbers up until max_seq. They are sorted in @@ -118,6 +213,9 @@ class SnapshotList { void GetAll(std::vector* snap_vector, SequenceNumber* oldest_write_conflict_snapshot = nullptr, const SequenceNumber& max_seq = kMaxSequenceNumber) const { +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::scoped_lock l(lock); +#endif std::vector& ret = *snap_vector; // So far we have no use case that would pass a non-empty vector assert(ret.size() == 0); @@ -176,12 +274,18 @@ class SnapshotList { } } +// How many snapshots in the SnapshotList uint64_t count() const { return count_; } + // How many snapshots in the system included those that created refcount + uint64_t logical_count() const { return logical_count_; } + + std::atomic_uint64_t logical_count_ = {0}; + uint64_t count_; private: // Dummy head of doubly-linked list of snapshots SnapshotImpl list_; - uint64_t count_; + }; // All operations on TimestampedSnapshotList must be protected by db mutex. @@ -235,5 +339,16 @@ class TimestampedSnapshotList { private: std::map> snapshots_; }; - +#ifdef SPEEDB_SNAP_OPTIMIZATION +inline void SnapshotImpl::Deleter::operator()(SnapshotImpl* snap) const { + if (snap->cached_snapshot == nullptr) { + std::scoped_lock l(snap->list_->lock); + snap->list_->count_--; + snap->prev_->next_ = snap->next_; + snap->next_->prev_ = snap->prev_; + snap->list_->deleteitem = true; + } + delete snap; +} +#endif } // namespace ROCKSDB_NAMESPACE