Skip to content

Commit

Permalink
Snapshot Optimization (#35)
Browse files Browse the repository at this point in the history
Motivation:
The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not.
The sequence number is being changed when modification happens in the db.
This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one.
In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.

This Feature must have folly library installed.

In order to cache the snapshots, there is last_snapshot_
(folly::atomic_shared_ptr, lock free atomic_shared_ptr) in order to
access the last_snapshot_ created and point to it.
For every GetSnapshotImpl call (where snapshots are being created), the
function checks if the sequence number is different than last_snapshot_,
if no, it creates new snapshot and inside this snapshot it adds a
reference to last_snapshot_ (the reference is cached_snapshot), so this sequence number will remain inside
SnapshotList (SnapshotList is the list of the snapshots in the system and used in compaction to show which snapshots are being used), if there are still snapshots holding this sequence number. If the sequence number as changed or the last_snapshot_ is nullptr it will create the snapshot while acquiring db_mutex.

For ReleaseSnapshotImpl (deleting a snapshot).
We will unref the last_snapshot_ (using comapre_exchange_weak) and if the refcount becomes 0, it will
call Deleter and remove this snapshot entirely from the SnapshotList and
continue with taking the db mutex.
If there are still references, it will return without taking it out from
the SnapshotList nor taking the db mutex
  • Loading branch information
Or Friedmann authored and Your Name committed Jul 23, 2023
1 parent f66ae81 commit bcb828a
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 49 deletions.
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,16 @@ if(WITH_TBB)
list(APPEND THIRDPARTY_LIBS TBB::TBB)
endif()

option(WITH_SNAP_OPTIMIZATION "Optimize Snapshot performance for read mostly workload" OFF)
if(WITH_SNAP_OPTIMIZATION)
find_package(folly REQUIRED)
add_definitions(-DSPEEDB_SNAP_OPTIMIZATION)
list(APPEND THIRDPARTY_LIBS folly)
message(STATUS "Enabling RTTI in all builds - part of folly requirements")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI")
endif()

# Stall notifications eat some performance from inserts
option(DISABLE_STALL_NOTIF "Build with stall notifications" OFF)
if(DISABLE_STALL_NOTIF)
Expand All @@ -446,6 +456,7 @@ endif()


# RTTI is by default AUTO which enables it in debug and disables it in release.
if(NOT WITH_SNAP_OPTIMIZATION)
set(USE_RTTI AUTO CACHE STRING "Enable RTTI in builds")
set_property(CACHE USE_RTTI PROPERTY STRINGS AUTO ON OFF)
if(USE_RTTI STREQUAL "AUTO")
Expand All @@ -471,6 +482,7 @@ else()
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif()
endif()
endif()

# Used to run CI build and tests so we can run faster
option(OPTDBG "Build optimized debug build with MSVC" OFF)
Expand Down
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ we still write it with its value to SST file.
This feature keeps only the delete record and reduce SST size for later compaction.
(#411)

* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.

### Enhancements
* CI: add a workflow for building and publishing jar to maven central (#507)
* LOG: Compaction job traces - report cf name and job id (#511)
Expand Down
31 changes: 31 additions & 0 deletions cmake/modules/FindFolly.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
find_path(FOLLY_ROOT_DIR
NAMES include/folly/folly-config.h
)

find_library(FOLLY_LIBRARIES
NAMES folly
HINTS ${FOLLY_ROOT_DIR}/lib
)

find_library(FOLLY_BENCHMARK_LIBRARIES
NAMES follybenchmark
HINTS ${FOLLY_ROOT_DIR}/lib
)

find_path(FOLLY_INCLUDE_DIR
NAMES folly/folly-config.h
HINTS ${FOLLY_ROOT_DIR}/include
)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Folly DEFAULT_MSG
FOLLY_LIBRARIES
FOLLY_INCLUDE_DIR
)

mark_as_advanced(
FOLLY_ROOT_DIR
FOLLY_LIBRARIES
FOLLY_BENCHMARK_LIBRARIES
FOLLY_INCLUDE_DIR
)
53 changes: 21 additions & 32 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
nonmem_write_thread_(immutable_db_options_),
write_controller_(immutable_db_options_.write_controller),
last_batch_group_size_(0),
snapshots_(immutable_db_options_.clock),
unscheduled_flushes_(0),
unscheduled_compactions_(0),
bg_bottom_compaction_scheduled_(0),
Expand Down Expand Up @@ -3714,27 +3715,22 @@ Status DBImpl::GetTimestampedSnapshots(

SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
if (!is_snapshot_supported_) {
return nullptr;
}
SnapshotImpl* snapshot = snapshots_.RefSnapshot(is_write_conflict_boundary,
GetLastPublishedSequence());
if (snapshot) {
return snapshot;
}

if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return nullptr;
}
auto snapshot_seq = GetLastPublishedSequence();
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
snapshot =
snapshots_.New(GetLastPublishedSequence(), is_write_conflict_boundary);
if (lock) {
mutex_.Unlock();
}
Expand All @@ -3744,10 +3740,11 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
std::pair<Status, std::shared_ptr<const SnapshotImpl>>
DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
return std::make_pair(
Status::NotSupported("Memtable does not support snapshot"), nullptr);
}

const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber);

Expand All @@ -3756,16 +3753,6 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return std::make_pair(
Status::NotSupported("Memtable does not support snapshot"), nullptr);
}

// Caller is not write thread, thus didn't provide a valid snapshot_seq.
// Obtain seq from db.
if (!need_update_seq) {
Expand Down Expand Up @@ -3815,15 +3802,14 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
if (lock) {
mutex_.Unlock();
}
delete s;
return std::make_pair(status, ret);
} else {
status.PermitUncheckedError();
}
}

SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time,
snapshots_.New(snapshot_seq,
/*is_write_conflict_boundary=*/true, ts);

std::shared_ptr<const SnapshotImpl> ret(
Expand Down Expand Up @@ -3870,9 +3856,13 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
return;
}
const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
if (snapshots_.UnRefSnapshot(casted_s)) {
return;
}
{
InstrumentedMutexLock l(&mutex_);
snapshots_.Delete(casted_s);
std::unique_lock<std::mutex> snapshotlist_lock(snapshots_.lock_);
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = GetLastPublishedSequence();
Expand Down Expand Up @@ -3913,7 +3903,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
}
}
delete casted_s;
}

Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ class DBImpl : public DB {
std::vector<Iterator*>* iterators) override;

virtual const Snapshot* GetSnapshot() override;
// Will unref a snapshot copy
// Returns true if the snapshot has not been deleted from SnapshotList
bool UnRefSnapshot(const SnapshotImpl* snapshot, bool& is_cached_snapshot);
// true if the snapshot provided has been referenced, otherwise false
bool RefSnapshot(bool is_write_conflict_boundary, SnapshotImpl* snapshot);
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
// Create a timestamped snapshot. This snapshot can be shared by multiple
// readers. If any of them uses it for write conflict checking, then
Expand Down
40 changes: 40 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2048,6 +2048,46 @@ TEST_F(DBTest2, DuplicateSnapshot) {
}
}

#ifdef SPEEDB_SNAP_OPTIMIZATION
// This test should run only if there is snapshot optimization enabled
TEST_F(DBTest2, RefSnapshot) {
Options options;
options = CurrentOptions(options);
std::vector<const Snapshot*> snapshots;
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
SequenceNumber oldest_ww_snap, first_ww_snap;

ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(db_->GetSnapshot());
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
first_ww_snap = snapshots.back()->GetSequenceNumber();
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
snapshots.push_back(db_->GetSnapshot());
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(db_->GetSnapshot()); // this should create a reference

{
InstrumentedMutexLock l(dbi->mutex());
auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap);
ASSERT_EQ(seqs.size(), 4); // duplicates are not counted
ASSERT_EQ(oldest_ww_snap, first_ww_snap);
ASSERT_EQ(dbi->snapshots().count(),
6); // how many snapshots stored in SnapshotList
ASSERT_EQ(dbi->snapshots().logical_count(),
8); // how many snapshots in the system
}

for (auto s : snapshots) {
db_->ReleaseSnapshot(s);
}
}
#endif

class PinL0IndexAndFilterBlocksTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
Expand Down
Loading

0 comments on commit bcb828a

Please sign in to comment.