Skip to content

Commit

Permalink
Write Flow: Spdb WF (#445)
Browse files Browse the repository at this point in the history
EXPERIMENTAL feature.

This write flow doesn't wait for the WAL to finish a write before writing
concurrently to the memtable, shortening the amount of time threads have
to wait for a write to complete in a heavily multithreaded write scenario.
  • Loading branch information
ayulas authored and Yuval-Ariel committed May 2, 2023
1 parent 57d8d0f commit fd233f2
Show file tree
Hide file tree
Showing 20 changed files with 705 additions and 4 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ set(SOURCES
db/db_impl/compacted_db_impl.cc
db/db_impl/db_impl.cc
db/db_impl/db_impl_write.cc
db/db_impl/db_spdb_impl_write.cc
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_files.cc
db/db_impl/db_impl_open.cc
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
"db/db_impl/db_impl_write.cc",
"db/db_impl/db_spdb_impl_write.cc",
"db/db_info_dumper.cc",
"db/db_iter.cc",
"db/dbformat.cc",
Expand Down Expand Up @@ -411,6 +412,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
"db/db_impl/db_impl_write.cc",
"db/db_impl/db_spdb_impl_write.cc",
"db/db_info_dumper.cc",
"db/db_iter.cc",
"db/dbformat.cc",
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
if (write_buffer_manager_) {
wbm_stall_.reset(new WBMStallInterface());
}

if (immutable_db_options_.use_spdb_writes) {
spdb_write_.reset(new SpdbWriteImpl(this));
}
}

Status DBImpl::Resume() {
Expand Down Expand Up @@ -562,6 +566,11 @@ Status DBImpl::CloseHelper() {
}
mutex_.Unlock();

// Shutdown Spdb write in order to ensure no writes will be handled
if (spdb_write_) {
spdb_write_->Shutdown();
}

// Below check is added as recovery_error_ is not checked and it causes crash
// in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is
// reached.
Expand Down
23 changes: 23 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "db/column_family.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_job.h"
#include "db/db_impl/db_spdb_impl_write.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
Expand Down Expand Up @@ -1265,6 +1266,25 @@ class DBImpl : public DB {
static void TEST_ResetDbSessionIdGen();
static std::string GenerateDbSessionId(Env* env);

public:
// SPDB write
bool CheckIfActionNeeded();
Status RegisterFlushOrTrim();
void SetLastSequence(uint64_t seq_inc) {
versions_->SetLastSequence(seq_inc);
}
uint64_t FetchAddLastAllocatedSequence(uint64_t batch_count) {
return versions_->FetchAddLastAllocatedSequence(batch_count);
}
Status SpdbWrite(const WriteOptions& write_options, WriteBatch* my_batch,
bool disable_memtable);
IOStatus SpdbWriteToWAL(WriteBatch* merged_batch, size_t write_with_wal,
const WriteBatch* to_be_cached_state, bool do_flush,
uint64_t* offset, uint64_t* size);
IOStatus SpdbSyncWAL(uint64_t offset, uint64_t size);

void SuspendSpdbWrites();
void ResumeSpdbWrites();
bool seq_per_batch() const { return seq_per_batch_; }

protected:
Expand Down Expand Up @@ -2709,6 +2729,9 @@ class DBImpl : public DB {

BlobFileCompletionCallback blob_callback_;

// Pointer to Speedb write flow
std::unique_ptr<SpdbWriteImpl> spdb_write_;

// Pointer to WriteBufferManager stalling interface.
std::unique_ptr<StallInterface> wbm_stall_;

Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2097,6 +2097,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
autovector<FlushRequest> flush_reqs;
autovector<uint64_t> memtable_ids_to_wait;
{
SuspendSpdbWrites();
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);

Expand Down Expand Up @@ -2161,6 +2162,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
}
ResumeSpdbWrites();

if (s.ok() && !flush_reqs.empty()) {
for (const auto& req : flush_reqs) {
Expand Down Expand Up @@ -2279,6 +2281,7 @@ Status DBImpl::AtomicFlushMemTables(
FlushRequest flush_req;
autovector<ColumnFamilyData*> cfds;
{
SuspendSpdbWrites();
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);

Expand Down Expand Up @@ -2314,6 +2317,8 @@ Status DBImpl::AtomicFlushMemTables(
break;
}
}
ResumeSpdbWrites();

if (s.ok()) {
AssignAtomicFlushSeq(cfds);
for (auto cfd : cfds) {
Expand Down
7 changes: 7 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (immutable_db_options_.allow_concurrent_memtable_write && spdb_write_) {
// TBD AYELET this is temporary. the handle of transaction in write flow
// needs careful assignment
return SpdbWrite(write_options, my_batch, disable_memtable);
}
assert(!seq_per_batch_ || batch_cnt != 0);

if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
// TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
return Status::NotSupported(
Expand Down
Loading

0 comments on commit fd233f2

Please sign in to comment.