Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
Or Friedmann committed Jul 3, 2023
1 parent a81e3ec commit 6359776
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 9 deletions.
2 changes: 1 addition & 1 deletion db/db_impl/db_spdb_impl_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class SpdbWriteImpl {
std::condition_variable flush_thread_cv_;
port::Mutex add_buffer_mutex_;
port::RWMutexWr flush_rwlock_;
std::thread flush_thread_;
port::Thread flush_thread_;
port::RWMutexWr wal_buffers_rwlock_;
port::Mutex wal_write_mutex_;
port::Mutex wb_list_mutex_;
Expand Down
5 changes: 3 additions & 2 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <thread>
#include <unordered_set>

#include "port/port.h"
#include "rocksdb/customizable.h"
#include "rocksdb/slice.h"

Expand Down Expand Up @@ -319,7 +320,7 @@ class MemTableRepFactory : public Customizable {

void Init() {
switch_memtable_thread_ =
std::thread(&MemTableRepFactory::PrepareSwitchMemTable, this);
port::Thread(&MemTableRepFactory::PrepareSwitchMemTable, this);
// need to verify the thread was executed
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
Expand Down Expand Up @@ -420,7 +421,7 @@ class MemTableRepFactory : public Customizable {
bool enable_switch_memtable_ = false;

private:
std::thread switch_memtable_thread_;
port::Thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
std::atomic<bool> terminate_switch_memtable_ = false;
Expand Down
3 changes: 2 additions & 1 deletion include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <utility>
#include <vector>

#include "port/port.h"
#include "rocksdb/cache.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -447,7 +448,7 @@ class WriteBufferManager final {
// reason to wakeup. See the thread's code for more details
bool new_flushes_wakeup_ = false;

std::thread flushes_thread_;
port::Thread flushes_thread_;
bool terminate_flushes_thread_ = false;
};

Expand Down
2 changes: 1 addition & 1 deletion memtable/write_buffer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ void WriteBufferManager::InitFlushInitiationVars(size_t quota) {

if (flushes_thread_.joinable() == false) {
flushes_thread_ =
std::thread(&WriteBufferManager::InitiateFlushesThread, this);
port::Thread(&WriteBufferManager::InitiateFlushesThread, this);
}
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/speedb/memtable/spdb_sorted_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class SpdbVectorContainer {
spdb_vectors_.push_front(spdb_vector);
spdb_vector->SetVectorListIter(std::prev(spdb_vectors_.end()));
curr_vector_.store(spdb_vector.get());
sort_thread_ = std::thread(&SpdbVectorContainer::SortThread, this);
sort_thread_ = port::Thread(&SpdbVectorContainer::SortThread, this);
}

~SpdbVectorContainer() {
Expand Down Expand Up @@ -246,7 +246,7 @@ class SpdbVectorContainer {
std::atomic<bool> immutable_;
// sort thread info
std::atomic<size_t> num_elements_;
std::thread sort_thread_;
port::Thread sort_thread_;
std::mutex sort_thread_mutex_;
std::condition_variable sort_thread_cv_;
};
Expand Down
41 changes: 39 additions & 2 deletions port/port_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#pragma once

#include <thread>

#include <iostream>
#include "rocksdb/port_defs.h"
#include "rocksdb/rocksdb_namespace.h"

Expand Down Expand Up @@ -176,7 +176,44 @@ class CondVar {
Mutex* mu_;
};

using Thread = std::thread;

class ThreadRocksDB
{
public:
template <typename Function, typename... Args>
ThreadRocksDB(Function&& func, Args&&... args)
{
thread_ = std::thread(std::forward<Function>(func), std::forward<Args>(args)...);
}

ThreadRocksDB() {}
bool joinable() {
return thread_.joinable();
}

void join() {
thread_.join();
}
void detach() {
thread_.detach();
}
std::thread::id get_id() {
return thread_.get_id();
}
std::thread& operator=(std::thread &&__t) {
thread_ = std::move(__t);
return thread_;
}
std::thread::native_handle_type native_handle() {
return thread_.native_handle();
}


private:
std::thread thread_;
};

using Thread = ThreadRocksDB;

static inline void AsmVolatilePause() {
#if defined(__i386__) || defined(__x86_64__)
Expand Down

0 comments on commit 6359776

Please sign in to comment.