Skip to content

Commit

Permalink
generic switch memtable allows the ability to pre create memtable tha…
Browse files Browse the repository at this point in the history
…t allocated memory as part of the constructor

this will reduce the penelty time of switch mutable memtable which by definition will effect write flow (#296)
  • Loading branch information
ayulas authored and Yuval-Ariel committed Dec 18, 2022
1 parent a8226e7 commit 14177a2
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 63 deletions.
103 changes: 101 additions & 2 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
#include <stdint.h>
#include <stdlib.h>

#include <condition_variable>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <unordered_set>

#include "rocksdb/customizable.h"
Expand Down Expand Up @@ -294,8 +297,36 @@ class MemTableRep {
// new MemTableRep objects
class MemTableRepFactory : public Customizable {
public:
~MemTableRepFactory() override {}
MemTableRepFactory() {}

~MemTableRepFactory() override {
if (enable_switch_memtable_) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
terminate_switch_memtable_.store(true);
}
switch_memtable_thread_cv_.notify_one();
switch_memtable_thread_.join();

const MemTableRep* memtable = switch_mem_.exchange(nullptr);
if (memtable != nullptr) {
delete memtable;
}
}
}

void Init() {
switch_memtable_thread_ =
std::thread(&MemTableRepFactory::PrepareSwitchMemTable, this);
// need to verify the thread was executed
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
while (!switch_memtable_thread_init_.load()) {
switch_memtable_thread_cv_.wait(lck);
}
}
enable_switch_memtable_ = true;
}
static const char* Type() { return "MemTableRepFactory"; }
static Status CreateFromString(const ConfigOptions& config_options,
const std::string& id,
Expand All @@ -311,7 +342,11 @@ class MemTableRepFactory : public Customizable {
const MemTableRep::KeyComparator& key_cmp, Allocator* allocator,
const SliceTransform* slice_transform, Logger* logger,
uint32_t /* column_family_id */) {
return CreateMemTableRep(key_cmp, allocator, slice_transform, logger);
if (enable_switch_memtable_) {
return GetSwitchMemtable(key_cmp, allocator, slice_transform, logger);
} else {
return CreateMemTableRep(key_cmp, allocator, slice_transform, logger);
}
}

const char* Name() const override = 0;
Expand All @@ -325,6 +360,70 @@ class MemTableRepFactory : public Customizable {
// false when if the <key,seq> already exists.
// Default: false
virtual bool CanHandleDuplicatedKey() const { return false; }
virtual MemTableRep* PreCreateMemTableRep() { return nullptr; }
virtual void PostCreateMemTableRep(
MemTableRep* /*switch_mem*/,
const MemTableRep::KeyComparator& /*key_cmp*/, Allocator* /*allocator*/,
const SliceTransform* /*slice_transform*/, Logger* /*logger*/) {}
void PrepareSwitchMemTable() {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
switch_memtable_thread_init_.store(true);
}
switch_memtable_thread_cv_.notify_one();
for (;;) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
while (switch_mem_.load(std::memory_order_acquire) != nullptr) {
if (terminate_switch_memtable_.load()) {
return;
}

switch_memtable_thread_cv_.wait(lck);
}
}

// Construct new memtable only for the heavy object initilized proposed

switch_mem_.store(PreCreateMemTableRep(), std::memory_order_release);
}
}

MemTableRep* GetSwitchMemtable(const MemTableRep::KeyComparator& key_cmp,
Allocator* allocator,
const SliceTransform* slice_transform,
Logger* logger) {
MemTableRep* switch_mem = nullptr;
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release);
}
switch_memtable_thread_cv_.notify_one();

if (switch_mem == nullptr) {
// No point in suspending, just construct the memtable here
switch_mem =
CreateMemTableRep(key_cmp, allocator, slice_transform, logger);
} else {
PostCreateMemTableRep(switch_mem, key_cmp, allocator, slice_transform,
logger);
}
return switch_mem;
}

public:
// true if the current MemTableRep supports prepare memtable creation
// note that if it does the memtable contruction MUST NOT use any arena
// allocation!!! Default: false
bool enable_switch_memtable_ = false;

private:
std::thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
std::atomic<bool> terminate_switch_memtable_ = false;
std::atomic<bool> switch_memtable_thread_init_ = false;
std::atomic<MemTableRep*> switch_mem_ = nullptr;
};

// This uses a skip list to store keys. It is the default.
Expand Down
3 changes: 2 additions & 1 deletion options/customizable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1993,11 +1993,12 @@ TEST_F(LoadCustomizableTest, LoadMemTableRepFactoryTest) {
std::shared_ptr<MemTableRepFactory> factory;
Status s = TestExpectedBuiltins<MemTableRepFactory>(
"SpecialSkipListFactory", expected, &factory, &failures);
// There is a "cuckoo" factory registered that we expect to fail. Ignore the
// There is a "cuckoo" factory registerexd that we expect to fail. Ignore the
// error if this is the one
if (s.ok() || failures.size() > 1 || failures[0] != "cuckoo") {
ASSERT_OK(s);
}
factory = nullptr;
if (RegisterTests("Test")) {
ExpectCreateShared<MemTableRepFactory>("SpecialSkipListFactory");
}
Expand Down
59 changes: 10 additions & 49 deletions plugin/speedb/memtable/hash_spd_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,64 +573,25 @@ static std::unordered_map<std::string, OptionTypeInfo> hash_spd_factory_info = {
HashSpdRepFactory::HashSpdRepFactory(size_t bucket_count)
: bucket_count_(bucket_count) {
RegisterOptions("", &bucket_count_, &hash_spd_factory_info);
switch_memtable_thread_ =
std::thread(&HashSpdRepFactory::PrepareSwitchMemTable, this);
Init();
}

// HashSpdRepFactory

HashSpdRepFactory::~HashSpdRepFactory() {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
terminate_switch_memtable_ = true;
}
switch_memtable_thread_cv_.notify_one();
switch_memtable_thread_.join();

const MemTableRep* memtable = switch_mem_.exchange(nullptr);
if (memtable != nullptr) {
delete memtable;
}
}
MemTableRep* HashSpdRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* /*transform*/, Logger* /*logger*/) {
return GetSwitchMemtable(compare, allocator);
return new HashSpdRep(compare, allocator, bucket_count_);
}

void HashSpdRepFactory::PrepareSwitchMemTable() {
for (;;) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
while (switch_mem_.load(std::memory_order_acquire) != nullptr) {
if (terminate_switch_memtable_) {
return;
}

switch_memtable_thread_cv_.wait(lck);
}
}
switch_mem_.store(new HashSpdRep(nullptr, bucket_count_),
std::memory_order_release);
}
MemTableRep* HashSpdRepFactory::PreCreateMemTableRep() {
MemTableRep* hash_spd = new HashSpdRep(nullptr, bucket_count_);
return hash_spd;
}

MemTableRep* HashSpdRepFactory::GetSwitchMemtable(
const MemTableRep::KeyComparator& compare, Allocator* allocator) {
MemTableRep* switch_mem = nullptr;
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release);
}
switch_memtable_thread_cv_.notify_one();

if (switch_mem == nullptr) {
// No point in suspending, just construct the memtable here
switch_mem = new HashSpdRep(compare, allocator, bucket_count_);
} else {
static_cast<HashSpdRep*>(switch_mem)->PostCreate(compare, allocator);
}
return switch_mem;
void HashSpdRepFactory::PostCreateMemTableRep(
MemTableRep* switch_mem, const MemTableRep::KeyComparator& compare,
Allocator* allocator, const SliceTransform* /*transform*/,
Logger* /*logger*/) {
static_cast<HashSpdRep*>(switch_mem)->PostCreate(compare, allocator);
}

} // namespace ROCKSDB_NAMESPACE
Expand Down
17 changes: 6 additions & 11 deletions plugin/speedb/memtable/hash_spd_rep.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace ROCKSDB_NAMESPACE {
class HashSpdRepFactory : public MemTableRepFactory {
public:
explicit HashSpdRepFactory(size_t bucket_count = 1000000);
~HashSpdRepFactory() override;

using MemTableRepFactory::CreateMemTableRep;
MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& compare,
Expand All @@ -35,22 +34,18 @@ class HashSpdRepFactory : public MemTableRepFactory {
Logger* logger) override;
bool IsInsertConcurrentlySupported() const override { return true; }
bool CanHandleDuplicatedKey() const override { return true; }
MemTableRep* PreCreateMemTableRep() override;
void PostCreateMemTableRep(MemTableRep* switch_mem,
const MemTableRep::KeyComparator& compare,
Allocator* allocator,
const SliceTransform* transform,
Logger* logger) override;

static const char* kClassName() { return "speedb.HashSpdRepFactory"; }
const char* Name() const override { return kClassName(); }

private:
void PrepareSwitchMemTable();
MemTableRep* GetSwitchMemtable(const MemTableRep::KeyComparator& compare,
Allocator* allocator);

private:
size_t bucket_count_;
std::thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
bool terminate_switch_memtable_ = false;
std::atomic<MemTableRep*> switch_mem_ = nullptr;
};

} // namespace ROCKSDB_NAMESPACE
Expand Down

0 comments on commit 14177a2

Please sign in to comment.