Skip to content

Commit

Permalink
Snapshot Optimization (#35)
Browse files Browse the repository at this point in the history
Motivation:
The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not.
The sequence number is being changed when modification happens in the db.
This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one.
In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.

This Feature must have folly library installed.

In order to cache the snapshots, there is last_snapshot_
(folly::atomic_shared_ptr, lock free atomic_shared_ptr) in order to
access the last_snapshot_ created and point to it.
For every GetSnapshotImpl call (where snapshots are being created), the
function checks if the sequence number is different than last_snapshot_,
if no, it creates new snapshot and inside this snapshot it adds a
reference to last_snapshot_ (the reference is cached_snapshot), so this sequence number will remain inside
SnapshotList (SnapshotList is the list of the snapshots in the system and used in compaction to show which snapshots are being used), if there are still snapshots holding this sequence number. If the sequence number as changed or the last_snapshot_ is nullptr it will create the snapshot while acquiring db_mutex.

For ReleaseSnapshotImpl (deleting a snapshot).
We will unref the last_snapshot_ (using comapre_exchange_weak) and if the refcount becomes 0, it will
call Deleter and remove this snapshot entirely from the SnapshotList and
continue with taking the db mutex.
If there are still references, it will return without taking it out from
the SnapshotList nor taking the db mutex
  • Loading branch information
Or Friedmann committed Jun 14, 2023
1 parent cd136e1 commit edcfc11
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 17 deletions.
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,16 @@ if(WITH_TBB)
list(APPEND THIRDPARTY_LIBS TBB::TBB)
endif()

option(WITH_SNAP_OPTIMIZATION "Optimize Snapshot performance for read mostly workload" ON)
if(WITH_SNAP_OPTIMIZATION)
find_package(folly REQUIRED)
add_definitions(-DROCKSDB_SNAP_OPTIMIZATION)
list(APPEND THIRDPARTY_LIBS folly)
message(STATUS "Enabling RTTI in all builds - part of folly requirements")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI")
endif()

# Stall notifications eat some performance from inserts
option(DISABLE_STALL_NOTIF "Build with stall notifications" OFF)
if(DISABLE_STALL_NOTIF)
Expand All @@ -446,6 +456,7 @@ endif()


# RTTI is by default AUTO which enables it in debug and disables it in release.
if(NOT WITH_SNAP_OPTIMIZATION)
set(USE_RTTI AUTO CACHE STRING "Enable RTTI in builds")
set_property(CACHE USE_RTTI PROPERTY STRINGS AUTO ON OFF)
if(USE_RTTI STREQUAL "AUTO")
Expand All @@ -471,6 +482,7 @@ else()
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif()
endif()
endif()

# Used to run CI build and tests so we can run faster
option(OPTDBG "Build optimized debug build with MSVC" OFF)
Expand Down
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ To use this feature, pass allow_delays_and_stalls = true to the ctor of WBM (ren
setup delay requests starting from (start_delay_percent * _buffer_size) / 100 (default value is 70) (start_delay_percent is another WBM ctor parameter). Changes to the WBM's memory are tracked in WriteBufferManager::ReserveMem and FreeMem.
Once the WBM reached its capacity, writes will be stopped using the old ShouldStall() and WBMStallWrites(). (#423)

* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.

### Enhancements
* CI: add a workflow for building and publishing jar to maven central (#507)
* LOG: Compaction job traces - report cf name and job id (#511)
Expand Down
118 changes: 118 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3711,7 +3711,57 @@ Status DBImpl::GetTimestampedSnapshots(
timestamped_snapshots_.GetSnapshots(ts_lb, ts_ub, timestamped_snapshots);
return Status::OK();
}
#ifdef ROCKSDB_SNAP_OPTIMIZATION
SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
std::shared_ptr<SnapshotImpl> snap = snapshots_.last_snapshot_;
if (snap && snap->GetSequenceNumber() == GetLastPublishedSequence() &&
snap->is_write_conflict_boundary_ == is_write_conflict_boundary) {
s->cached_snapshot = snap;
snapshots_.count_.fetch_add(1);
snap->refcount.fetch_add(1);
s->number_ = snap->GetSequenceNumber();
s->unix_time_ = unix_time;
s->is_write_conflict_boundary_ = is_write_conflict_boundary;
return s;
}
snap = nullptr;

if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return nullptr;
}
auto snapshot_seq = GetLastPublishedSequence();
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
SnapshotImpl* user_snapshot = new SnapshotImpl;
auto new_last_snapshot =
std::shared_ptr<SnapshotImpl>(snapshot, SnapshotImpl::Deleter{});
user_snapshot->cached_snapshot = new_last_snapshot;
snapshots_.last_snapshot_ = new_last_snapshot;
user_snapshot->unix_time_ = unix_time;
user_snapshot->is_write_conflict_boundary_ = is_write_conflict_boundary;
user_snapshot->number_ = snapshot_seq;
if (lock) {
mutex_.Unlock();
}

return user_snapshot;
}
#else
SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0;
Expand Down Expand Up @@ -3740,6 +3790,7 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
}
return snapshot;
}
#endif

std::pair<Status, std::shared_ptr<const SnapshotImpl>>
DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
Expand Down Expand Up @@ -3861,7 +3912,73 @@ bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
return false;
}
} // namespace
#ifdef ROCKSDB_SNAP_OPTIMIZATION
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
if (s == nullptr) {
// DBImpl::GetSnapshot() can return nullptr when snapshot
// not supported by specifying the condition:
// inplace_update_support enabled.
return;
}
snapshots_.count_.fetch_sub(1);
const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
SnapshotImpl* snapshot = const_cast<SnapshotImpl*>(casted_s);
if (snapshot->cached_snapshot) {
size_t cnt = snapshot->cached_snapshot->refcount.fetch_sub(1);
if (cnt < 2) {
snapshots_.last_snapshot_.compare_exchange_weak(snapshot->cached_snapshot,nullptr);
}
delete snapshot;
}
if (!snapshots_.deleteitem) {
return;
}
{
InstrumentedMutexLock l(&mutex_);
std::scoped_lock<std::mutex> snaplock(snapshots_.lock);
snapshots_.deleteitem = false;
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = GetLastPublishedSequence();
} else {
oldest_snapshot = snapshots_.oldest()->number_;
}
// Avoid to go through every column family by checking a global threshold
// first.
if (oldest_snapshot > bottommost_files_mark_threshold_) {
CfdList cf_scheduled;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->ioptions()->allow_ingest_behind) {
cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
if (!cfd->current()
->storage_info()
->BottommostFilesMarkedForCompaction()
.empty()) {
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
cf_scheduled.push_back(cfd);
}
}
}

// Calculate a new threshold, skipping those CFs where compactions are
// scheduled. We do not do the same pass as the previous loop because
// mutex might be unlocked during the loop, making the result inaccurate.
SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (CfdListContains(cf_scheduled, cfd) ||
cfd->ioptions()->allow_ingest_behind) {
continue;
}
new_bottommost_files_mark_threshold = std::min(
new_bottommost_files_mark_threshold,
cfd->current()->storage_info()->bottommost_files_mark_threshold());
}
bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
}
}
}
#else
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
if (s == nullptr) {
// DBImpl::GetSnapshot() can return nullptr when snapshot
Expand Down Expand Up @@ -3915,6 +4032,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
}
delete casted_s;
}
#endif

Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
TablePropertiesCollection* props) {
Expand Down
58 changes: 54 additions & 4 deletions db/snapshot_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@

#include "db/dbformat.h"
#include "rocksdb/db.h"
#include "rocksdb/types.h"
#include "util/autovector.h"
///* will enable if the performance tests will require it
#include "folly/concurrency/AtomicSharedPtr.h"
//*/
#include <iostream>
#include <mutex>

namespace ROCKSDB_NAMESPACE {

Expand All @@ -22,17 +28,29 @@ class SnapshotList;
// Each SnapshotImpl corresponds to a particular sequence number.
class SnapshotImpl : public Snapshot {
public:
#ifdef ROCKSDB_SNAP_OPTIMIZATION
std::atomic_uint64_t refcount = {1};
std::shared_ptr<SnapshotImpl> cached_snapshot = nullptr;

struct Deleter {
inline void operator()(SnapshotImpl* snap) const;
};
int64_t unix_time_;
uint64_t timestamp_;
// Will this snapshot be used by a Transaction to do write-conflict checking?
bool is_write_conflict_boundary_;
#endif
SequenceNumber number_; // const after creation
// It indicates the smallest uncommitted data at the time the snapshot was
// taken. This is currently used by WritePrepared transactions to limit the
// scope of queries to IsInSnapshot.
SequenceNumber min_uncommitted_ = kMinUnCommittedSeq;

SequenceNumber GetSequenceNumber() const override { return number_; }
uint64_t GetTimestamp() const override { return timestamp_; }

int64_t GetUnixTime() const override { return unix_time_; }

uint64_t GetTimestamp() const override { return timestamp_; }
SequenceNumber GetSequenceNumber() const override { return number_; }

private:
friend class SnapshotList;
Expand All @@ -42,17 +60,25 @@ class SnapshotImpl : public Snapshot {
SnapshotImpl* next_;

SnapshotList* list_; // just for sanity checks

#ifndef ROCKSDB_SNAP_OPTIMIZATION
int64_t unix_time_;

uint64_t timestamp_;

// Will this snapshot be used by a Transaction to do write-conflict checking?
bool is_write_conflict_boundary_;
#endif
};

class SnapshotList {
public:
#ifdef ROCKSDB_SNAP_OPTIMIZATION
mutable std::mutex lock;
bool deleteitem = false;
///* If the folly::atomic_shared_ptr will provide significant performance gain
//it will be considered as a solution
folly::atomic_shared_ptr<SnapshotImpl> last_snapshot_;
#endif
SnapshotList() {
list_.prev_ = &list_;
list_.next_ = &list_;
Expand All @@ -63,6 +89,9 @@ class SnapshotList {
list_.timestamp_ = 0;
list_.is_write_conflict_boundary_ = false;
count_ = 0;
#ifdef ROCKSDB_SNAP_OPTIMIZATION
last_snapshot_ = nullptr;
#endif
}

// No copy-construct.
Expand All @@ -84,6 +113,9 @@ class SnapshotList {
SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time,
bool is_write_conflict_boundary,
uint64_t ts = std::numeric_limits<uint64_t>::max()) {
#ifdef ROCKSDB_SNAP_OPTIMIZATION
std::scoped_lock<std::mutex> l(lock);
#endif
s->number_ = seq;
s->unix_time_ = unix_time;
s->timestamp_ = ts;
Expand Down Expand Up @@ -118,6 +150,9 @@ class SnapshotList {
void GetAll(std::vector<SequenceNumber>* snap_vector,
SequenceNumber* oldest_write_conflict_snapshot = nullptr,
const SequenceNumber& max_seq = kMaxSequenceNumber) const {
#ifdef ROCKSDB_SNAP_OPTIMIZATION
std::scoped_lock<std::mutex> l(lock);
#endif
std::vector<SequenceNumber>& ret = *snap_vector;
// So far we have no use case that would pass a non-empty vector
assert(ret.size() == 0);
Expand Down Expand Up @@ -177,11 +212,16 @@ class SnapshotList {
}

uint64_t count() const { return count_; }
#ifdef ROCKSDB_SNAP_OPTIMIZATION
std::atomic_uint64_t count_;
#endif

private:
// Dummy head of doubly-linked list of snapshots
SnapshotImpl list_;
#ifndef ROCKSDB_SNAP_OPTIMIZATION
uint64_t count_;
#endif
};

// All operations on TimestampedSnapshotList must be protected by db mutex.
Expand Down Expand Up @@ -235,5 +275,15 @@ class TimestampedSnapshotList {
private:
std::map<uint64_t, std::shared_ptr<const SnapshotImpl>> snapshots_;
};

#ifdef ROCKSDB_SNAP_OPTIMIZATION
inline void SnapshotImpl::Deleter::operator()(SnapshotImpl* snap) const {
if (snap->cached_snapshot == nullptr) {
std::scoped_lock<std::mutex> l(snap->list_->lock);
snap->prev_->next_ = snap->next_;
snap->next_->prev_ = snap->prev_;
snap->list_->deleteitem = true;
}
delete snap;
}
#endif
} // namespace ROCKSDB_NAMESPACE
30 changes: 17 additions & 13 deletions examples/speedb_is_awesome_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#include <iostream>

#include <thread>
#include "rocksdb/db.h"
#include "rocksdb/options.h"

Expand All @@ -39,18 +39,22 @@ int main() {
assert(s.ok());

// append new entry
std::string key = "key_1";
std::string put_value = "Speedb is awesome!";
s = db->Put(WriteOptions(), key, put_value);
assert(s.ok());

// retrieve entry
std::string get_value;
s = db->Get(ReadOptions(), key, &get_value);
assert(s.ok());
assert(get_value == put_value);
std::cout << get_value << std::endl;

std::vector<std::thread> threads(8);
auto lambda = [=](){
std::unordered_set<const rocksdb::Snapshot*> snaps;
for (auto i = 0;i < 1000000;i++) {
snaps.emplace(db->GetSnapshot());
}
for(auto i : snaps) {
db->ReleaseSnapshot(i);
}
};
for (size_t i = 0; i < threads.size();i++) {
threads[i] = std::thread(lambda);
}
for (size_t i = 0; i < threads.size();i++) {
threads[i].join();
}
// close DB
s = db->Close();
assert(s.ok());
Expand Down

0 comments on commit edcfc11

Please sign in to comment.