From 14177a2fc5660a3dbc1cdee599a8a82619e95d74 Mon Sep 17 00:00:00 2001 From: Ayelet Shani Date: Wed, 14 Dec 2022 12:13:59 +0200 Subject: [PATCH] generic switch memtable allows the ability to pre create memtable that allocated memory as part of the constructor this will reduce the penelty time of switch mutable memtable which by definition will effect write flow (#296) --- include/rocksdb/memtablerep.h | 103 ++++++++++++++++++++++++- options/customizable_test.cc | 3 +- plugin/speedb/memtable/hash_spd_rep.cc | 59 +++----------- plugin/speedb/memtable/hash_spd_rep.h | 17 ++-- 4 files changed, 119 insertions(+), 63 deletions(-) diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index cb5444dca3..bd56bcd69f 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -38,8 +38,11 @@ #include #include +#include #include +#include #include +#include #include #include "rocksdb/customizable.h" @@ -294,8 +297,36 @@ class MemTableRep { // new MemTableRep objects class MemTableRepFactory : public Customizable { public: - ~MemTableRepFactory() override {} + MemTableRepFactory() {} + + ~MemTableRepFactory() override { + if (enable_switch_memtable_) { + { + std::unique_lock lck(switch_memtable_thread_mutex_); + terminate_switch_memtable_.store(true); + } + switch_memtable_thread_cv_.notify_one(); + switch_memtable_thread_.join(); + + const MemTableRep* memtable = switch_mem_.exchange(nullptr); + if (memtable != nullptr) { + delete memtable; + } + } + } + void Init() { + switch_memtable_thread_ = + std::thread(&MemTableRepFactory::PrepareSwitchMemTable, this); + // need to verify the thread was executed + { + std::unique_lock lck(switch_memtable_thread_mutex_); + while (!switch_memtable_thread_init_.load()) { + switch_memtable_thread_cv_.wait(lck); + } + } + enable_switch_memtable_ = true; + } static const char* Type() { return "MemTableRepFactory"; } static Status CreateFromString(const ConfigOptions& config_options, const std::string& id, @@ -311,7 +342,11 @@ class MemTableRepFactory : public Customizable { const MemTableRep::KeyComparator& key_cmp, Allocator* allocator, const SliceTransform* slice_transform, Logger* logger, uint32_t /* column_family_id */) { - return CreateMemTableRep(key_cmp, allocator, slice_transform, logger); + if (enable_switch_memtable_) { + return GetSwitchMemtable(key_cmp, allocator, slice_transform, logger); + } else { + return CreateMemTableRep(key_cmp, allocator, slice_transform, logger); + } } const char* Name() const override = 0; @@ -325,6 +360,70 @@ class MemTableRepFactory : public Customizable { // false when if the already exists. // Default: false virtual bool CanHandleDuplicatedKey() const { return false; } + virtual MemTableRep* PreCreateMemTableRep() { return nullptr; } + virtual void PostCreateMemTableRep( + MemTableRep* /*switch_mem*/, + const MemTableRep::KeyComparator& /*key_cmp*/, Allocator* /*allocator*/, + const SliceTransform* /*slice_transform*/, Logger* /*logger*/) {} + void PrepareSwitchMemTable() { + { + std::unique_lock lck(switch_memtable_thread_mutex_); + switch_memtable_thread_init_.store(true); + } + switch_memtable_thread_cv_.notify_one(); + for (;;) { + { + std::unique_lock lck(switch_memtable_thread_mutex_); + while (switch_mem_.load(std::memory_order_acquire) != nullptr) { + if (terminate_switch_memtable_.load()) { + return; + } + + switch_memtable_thread_cv_.wait(lck); + } + } + + // Construct new memtable only for the heavy object initilized proposed + + switch_mem_.store(PreCreateMemTableRep(), std::memory_order_release); + } + } + + MemTableRep* GetSwitchMemtable(const MemTableRep::KeyComparator& key_cmp, + Allocator* allocator, + const SliceTransform* slice_transform, + Logger* logger) { + MemTableRep* switch_mem = nullptr; + { + std::unique_lock lck(switch_memtable_thread_mutex_); + switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release); + } + switch_memtable_thread_cv_.notify_one(); + + if (switch_mem == nullptr) { + // No point in suspending, just construct the memtable here + switch_mem = + CreateMemTableRep(key_cmp, allocator, slice_transform, logger); + } else { + PostCreateMemTableRep(switch_mem, key_cmp, allocator, slice_transform, + logger); + } + return switch_mem; + } + + public: + // true if the current MemTableRep supports prepare memtable creation + // note that if it does the memtable contruction MUST NOT use any arena + // allocation!!! Default: false + bool enable_switch_memtable_ = false; + + private: + std::thread switch_memtable_thread_; + std::mutex switch_memtable_thread_mutex_; + std::condition_variable switch_memtable_thread_cv_; + std::atomic terminate_switch_memtable_ = false; + std::atomic switch_memtable_thread_init_ = false; + std::atomic switch_mem_ = nullptr; }; // This uses a skip list to store keys. It is the default. diff --git a/options/customizable_test.cc b/options/customizable_test.cc index 9d3c86c620..297a51eccf 100644 --- a/options/customizable_test.cc +++ b/options/customizable_test.cc @@ -1993,11 +1993,12 @@ TEST_F(LoadCustomizableTest, LoadMemTableRepFactoryTest) { std::shared_ptr factory; Status s = TestExpectedBuiltins( "SpecialSkipListFactory", expected, &factory, &failures); - // There is a "cuckoo" factory registered that we expect to fail. Ignore the + // There is a "cuckoo" factory registerexd that we expect to fail. Ignore the // error if this is the one if (s.ok() || failures.size() > 1 || failures[0] != "cuckoo") { ASSERT_OK(s); } + factory = nullptr; if (RegisterTests("Test")) { ExpectCreateShared("SpecialSkipListFactory"); } diff --git a/plugin/speedb/memtable/hash_spd_rep.cc b/plugin/speedb/memtable/hash_spd_rep.cc index b68214bb04..abd526ac13 100644 --- a/plugin/speedb/memtable/hash_spd_rep.cc +++ b/plugin/speedb/memtable/hash_spd_rep.cc @@ -573,64 +573,25 @@ static std::unordered_map hash_spd_factory_info = { HashSpdRepFactory::HashSpdRepFactory(size_t bucket_count) : bucket_count_(bucket_count) { RegisterOptions("", &bucket_count_, &hash_spd_factory_info); - switch_memtable_thread_ = - std::thread(&HashSpdRepFactory::PrepareSwitchMemTable, this); + Init(); } -// HashSpdRepFactory - -HashSpdRepFactory::~HashSpdRepFactory() { - { - std::unique_lock lck(switch_memtable_thread_mutex_); - terminate_switch_memtable_ = true; - } - switch_memtable_thread_cv_.notify_one(); - switch_memtable_thread_.join(); - - const MemTableRep* memtable = switch_mem_.exchange(nullptr); - if (memtable != nullptr) { - delete memtable; - } -} MemTableRep* HashSpdRepFactory::CreateMemTableRep( const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* /*transform*/, Logger* /*logger*/) { - return GetSwitchMemtable(compare, allocator); + return new HashSpdRep(compare, allocator, bucket_count_); } -void HashSpdRepFactory::PrepareSwitchMemTable() { - for (;;) { - { - std::unique_lock lck(switch_memtable_thread_mutex_); - while (switch_mem_.load(std::memory_order_acquire) != nullptr) { - if (terminate_switch_memtable_) { - return; - } - - switch_memtable_thread_cv_.wait(lck); - } - } - switch_mem_.store(new HashSpdRep(nullptr, bucket_count_), - std::memory_order_release); - } +MemTableRep* HashSpdRepFactory::PreCreateMemTableRep() { + MemTableRep* hash_spd = new HashSpdRep(nullptr, bucket_count_); + return hash_spd; } -MemTableRep* HashSpdRepFactory::GetSwitchMemtable( - const MemTableRep::KeyComparator& compare, Allocator* allocator) { - MemTableRep* switch_mem = nullptr; - { - std::unique_lock lck(switch_memtable_thread_mutex_); - switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release); - } - switch_memtable_thread_cv_.notify_one(); - - if (switch_mem == nullptr) { - // No point in suspending, just construct the memtable here - switch_mem = new HashSpdRep(compare, allocator, bucket_count_); - } else { - static_cast(switch_mem)->PostCreate(compare, allocator); - } - return switch_mem; +void HashSpdRepFactory::PostCreateMemTableRep( + MemTableRep* switch_mem, const MemTableRep::KeyComparator& compare, + Allocator* allocator, const SliceTransform* /*transform*/, + Logger* /*logger*/) { + static_cast(switch_mem)->PostCreate(compare, allocator); } } // namespace ROCKSDB_NAMESPACE diff --git a/plugin/speedb/memtable/hash_spd_rep.h b/plugin/speedb/memtable/hash_spd_rep.h index 6d79b08992..1eb0ffab7d 100644 --- a/plugin/speedb/memtable/hash_spd_rep.h +++ b/plugin/speedb/memtable/hash_spd_rep.h @@ -26,7 +26,6 @@ namespace ROCKSDB_NAMESPACE { class HashSpdRepFactory : public MemTableRepFactory { public: explicit HashSpdRepFactory(size_t bucket_count = 1000000); - ~HashSpdRepFactory() override; using MemTableRepFactory::CreateMemTableRep; MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& compare, @@ -35,22 +34,18 @@ class HashSpdRepFactory : public MemTableRepFactory { Logger* logger) override; bool IsInsertConcurrentlySupported() const override { return true; } bool CanHandleDuplicatedKey() const override { return true; } + MemTableRep* PreCreateMemTableRep() override; + void PostCreateMemTableRep(MemTableRep* switch_mem, + const MemTableRep::KeyComparator& compare, + Allocator* allocator, + const SliceTransform* transform, + Logger* logger) override; static const char* kClassName() { return "speedb.HashSpdRepFactory"; } const char* Name() const override { return kClassName(); } - private: - void PrepareSwitchMemTable(); - MemTableRep* GetSwitchMemtable(const MemTableRep::KeyComparator& compare, - Allocator* allocator); - private: size_t bucket_count_; - std::thread switch_memtable_thread_; - std::mutex switch_memtable_thread_mutex_; - std::condition_variable switch_memtable_thread_cv_; - bool terminate_switch_memtable_ = false; - std::atomic switch_mem_ = nullptr; }; } // namespace ROCKSDB_NAMESPACE