Skip to content

Commit

Permalink
Snapshot Optimization (speedb-io#35)
Browse files Browse the repository at this point in the history
Motivation:
The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not.
The sequence number is being changed when modification happens in the db.
This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one.
In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.

This Feature adds new atomic_shared_ptr that used mutex in order to
synchronize the shared_ptr.

In order to cache the snapshots, there is last_snapshot_
(atomic_shared_ptr) in order to
access the last_snapshot_ created and point to it.
For every GetSnapshotImpl call (where snapshots are being created), the
function checks if the sequence number is different than last_snapshot_,
if no, it creates new snapshot and inside this snapshot it adds a
reference to last_snapshot_ (the reference is cached_snapshot), so this sequence number will remain inside
SnapshotList (SnapshotList is the list of the snapshots in the system and used in compaction to show which snapshots are being used), if there are still snapshots holding this sequence number. If the sequence number as changed or the last_snapshot_ is nullptr it will create the snapshot while acquiring db_mutex.

For ReleaseSnapshotImpl (deleting a snapshot).
We will unref the last_snapshot_ and if the refcount becomes 0, it will
call Deleter and remove this snapshot entirely from the SnapshotList and
continue with taking the db mutex.
If there are still references, it will return without taking it out from
the SnapshotList nor taking the db mutex
  • Loading branch information
Or Friedmann committed Jun 8, 2023
1 parent 1788bc7 commit 8b9a149
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 13 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ To use this feature, pass allow_delays_and_stalls = true to the ctor of WBM (ren
setup delay requests starting from (start_delay_percent * _buffer_size) / 100 (default value is 70) (start_delay_percent is another WBM ctor parameter). Changes to the WBM's memory are tracked in WriteBufferManager::ReserveMem and FreeMem.
Once the WBM reached its capacity, writes will be stopped using the old ShouldStall() and WBMStallWrites(). (#423)

* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.

### Enhancements
* CI: add a workflow for building and publishing jar to maven central (#507)
* LOG: Compaction job traces - report cf name and job id (#511)
Expand Down
41 changes: 38 additions & 3 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3717,7 +3717,18 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
std::shared_ptr<SnapshotImpl> snap = snapshots_.last_snapshot_;
SnapshotImpl* s = new SnapshotImpl;
if (snap && snap->GetSequenceNumber() == GetLastPublishedSequence() &&
snap->is_write_conflict_boundary_ == is_write_conflict_boundary) {
s->cached_snapshot = snap;
snapshots_.count_.fetch_add(1);
s->number_ = snap->GetSequenceNumber();
s->unix_time_ = unix_time;
s->is_write_conflict_boundary_ = is_write_conflict_boundary;
return s;
}
snap = nullptr;

if (lock) {
mutex_.Lock();
Expand All @@ -3735,10 +3746,19 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
auto snapshot_seq = GetLastPublishedSequence();
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
SnapshotImpl* user_snapshot = new SnapshotImpl;
auto new_last_snapshot =
std::shared_ptr<SnapshotImpl>(snapshot, SnapshotImpl::Deleter{});
user_snapshot->cached_snapshot = new_last_snapshot;
snapshots_.last_snapshot_ = new_last_snapshot;
user_snapshot->unix_time_ = unix_time;
user_snapshot->is_write_conflict_boundary_ = is_write_conflict_boundary;
user_snapshot->number_ = snapshot_seq;
if (lock) {
mutex_.Unlock();
}
return snapshot;

return user_snapshot;
}

std::pair<Status, std::shared_ptr<const SnapshotImpl>>
Expand Down Expand Up @@ -3869,10 +3889,26 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
// inplace_update_support enabled.
return;
}
snapshots_.count_.fetch_sub(1);
const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
SnapshotImpl* snapshot = const_cast<SnapshotImpl*>(casted_s);
#ifdef USE_FOLLY
auto cached = snapshots_.last_snapshot_.load();
#else
auto cached = snapshots_.last_snapshot_.LoadSnap();
#endif

if (snapshot->cached_snapshot != nullptr) {
delete snapshot;
}
cached = nullptr;
if (!snapshots_.deleteitem) {
return;
}
{
InstrumentedMutexLock l(&mutex_);
snapshots_.Delete(casted_s);
std::scoped_lock<std::mutex> snaplock(snapshots_.lock);
snapshots_.deleteitem = false;
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = GetLastPublishedSequence();
Expand Down Expand Up @@ -3913,7 +3949,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
}
}
delete casted_s;
}

Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
Expand Down
54 changes: 44 additions & 10 deletions db/snapshot_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@

#include "db/dbformat.h"
#include "rocksdb/db.h"
#include "rocksdb/types.h"
#include "util/autovector.h"
/* will enable if the performance tests will require it
#ifdef USE_FOLLY
#include "folly/concurrency/AtomicSharedPtr.h"
#endif
*/
#include <iostream>
#include <mutex>

namespace ROCKSDB_NAMESPACE {

Expand All @@ -22,17 +30,27 @@ class SnapshotList;
// Each SnapshotImpl corresponds to a particular sequence number.
class SnapshotImpl : public Snapshot {
public:
int64_t unix_time_;
std::shared_ptr<Snapshot> cached_snapshot = nullptr;

uint64_t timestamp_;
// Will this snapshot be used by a Transaction to do write-conflict checking?
bool is_write_conflict_boundary_;
SequenceNumber number_; // const after creation
// It indicates the smallest uncommitted data at the time the snapshot was
// taken. This is currently used by WritePrepared transactions to limit the
// scope of queries to IsInSnapshot.
SequenceNumber min_uncommitted_ = kMinUnCommittedSeq;

SequenceNumber GetSequenceNumber() const override { return number_; }
uint64_t GetTimestamp() const override { return timestamp_; }

int64_t GetUnixTime() const override { return unix_time_; }

uint64_t GetTimestamp() const override { return timestamp_; }
SequenceNumber GetSequenceNumber() const override{ return number_; }

struct Deleter {
inline void operator()(SnapshotImpl* snap) const;
};

private:
friend class SnapshotList;
Expand All @@ -42,17 +60,20 @@ class SnapshotImpl : public Snapshot {
SnapshotImpl* next_;

SnapshotList* list_; // just for sanity checks

int64_t unix_time_;

uint64_t timestamp_;

// Will this snapshot be used by a Transaction to do write-conflict checking?
bool is_write_conflict_boundary_;
};

class SnapshotList {
public:
mutable std::mutex lock;
bool deleteitem = false;
/* If the folly::atomic_shared_ptr will provide significant performance gain
it will be considered as a solution
#ifdef USE_FOLLY
folly::atomic_shared_ptr<SnapshotImpl> last_snapshot_;
#else
*/
mutable rocksdb::atomic_shared_ptr<SnapshotImpl> last_snapshot_;
//#endif
SnapshotList() {
list_.prev_ = &list_;
list_.next_ = &list_;
Expand All @@ -63,6 +84,7 @@ class SnapshotList {
list_.timestamp_ = 0;
list_.is_write_conflict_boundary_ = false;
count_ = 0;
last_snapshot_ = nullptr;
}

// No copy-construct.
Expand All @@ -84,6 +106,7 @@ class SnapshotList {
SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time,
bool is_write_conflict_boundary,
uint64_t ts = std::numeric_limits<uint64_t>::max()) {
std::scoped_lock<std::mutex> l(lock);
s->number_ = seq;
s->unix_time_ = unix_time;
s->timestamp_ = ts;
Expand Down Expand Up @@ -118,6 +141,7 @@ class SnapshotList {
void GetAll(std::vector<SequenceNumber>* snap_vector,
SequenceNumber* oldest_write_conflict_snapshot = nullptr,
const SequenceNumber& max_seq = kMaxSequenceNumber) const {
std::scoped_lock<std::mutex> l(lock);
std::vector<SequenceNumber>& ret = *snap_vector;
// So far we have no use case that would pass a non-empty vector
assert(ret.size() == 0);
Expand Down Expand Up @@ -177,11 +201,11 @@ class SnapshotList {
}

uint64_t count() const { return count_; }
std::atomic_uint64_t count_;

private:
// Dummy head of doubly-linked list of snapshots
SnapshotImpl list_;
uint64_t count_;
};

// All operations on TimestampedSnapshotList must be protected by db mutex.
Expand Down Expand Up @@ -236,4 +260,14 @@ class TimestampedSnapshotList {
std::map<uint64_t, std::shared_ptr<const SnapshotImpl>> snapshots_;
};

inline void SnapshotImpl::Deleter::operator()(SnapshotImpl* snap) const {
if (snap->cached_snapshot == nullptr) {
std::scoped_lock<std::mutex> l(snap->list_->lock);
snap->prev_->next_ = snap->next_;
snap->next_->prev_ = snap->prev_;
snap->list_->deleteitem = true;
}
delete snap;
}

} // namespace ROCKSDB_NAMESPACE
58 changes: 58 additions & 0 deletions include/rocksdb/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

#include <stdint.h>

#include <memory>
#include <mutex>
#include <shared_mutex>

#include "rocksdb/slice.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -91,4 +95,58 @@ enum class WriteStallCondition {
kNormal,
};

template <typename T>
class atomic_shared_ptr {
public:
atomic_shared_ptr() : data(nullptr) {}

explicit atomic_shared_ptr(std::shared_ptr<T> ptr) : data(ptr) {}

atomic_shared_ptr(const atomic_shared_ptr& other) {
std::lock_guard<std::mutex> lock(other.mutex);
data = other.data;
}

atomic_shared_ptr& operator=(const atomic_shared_ptr& other) {
if (this != &other) {
std::lock_guard<std::mutex> this_lock(mutex);
std::lock_guard<std::mutex> other_lock(other.mutex);
data = other.data;
}
return *this;
}
atomic_shared_ptr& operator=(std::shared_ptr<T> ptr) {
std::lock_guard<std::mutex> lock(mutex);
data = ptr;
return *this;
}
void store(std::shared_ptr<T> ptr) {
std::lock_guard<std::mutex> lock(mutex);
data = ptr;
}

std::shared_ptr<T> load() const {
std::lock_guard<std::mutex> lock(mutex);
return data;
}
// special load function for ReleaseSnapshot, in order to delete the
// SnapshotList Node if the last_snapshot_ is the only pointer
std::shared_ptr<T> LoadSnap() const {
std::lock_guard<std::mutex> lock(mutex);
if (data.use_count() == 2) {
data = nullptr;
}
return data;
}

operator std::shared_ptr<T>() const {
std::lock_guard<std::mutex> lock(mutex);
return data;
}

private:
mutable std::mutex mutex;
mutable std::shared_ptr<T> data;
};

} // namespace ROCKSDB_NAMESPACE

0 comments on commit 8b9a149

Please sign in to comment.