Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

444:Spdb WF #445

Merged
merged 2 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ set(SOURCES
db/db_impl/compacted_db_impl.cc
db/db_impl/db_impl.cc
db/db_impl/db_impl_write.cc
db/db_impl/db_spdb_impl_write.cc
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_files.cc
db/db_impl/db_impl_open.cc
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
"db/db_impl/db_impl_write.cc",
"db/db_impl/db_spdb_impl_write.cc",
"db/db_info_dumper.cc",
"db/db_iter.cc",
"db/dbformat.cc",
Expand Down Expand Up @@ -402,6 +403,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
"db/db_impl/db_impl_write.cc",
"db/db_impl/db_spdb_impl_write.cc",
"db/db_info_dumper.cc",
"db/db_iter.cc",
"db/dbformat.cc",
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
if (write_buffer_manager_) {
wbm_stall_.reset(new WBMStallInterface());
}

if (immutable_db_options_.use_spdb_writes) {
spdb_write_.reset(new SpdbWriteImpl(this));
}
}

Status DBImpl::Resume() {
Expand Down Expand Up @@ -574,6 +578,11 @@ Status DBImpl::CloseHelper() {
}
mutex_.Unlock();

// Shutdown Spdb write in order to ensure no writes will be handled
if (spdb_write_) {
spdb_write_->Shutdown();
}

// Below check is added as recovery_error_ is not checked and it causes crash
// in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is
// reached.
Expand Down
23 changes: 23 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "db/column_family.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_job.h"
#include "db/db_impl/db_spdb_impl_write.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
Expand Down Expand Up @@ -1256,6 +1257,25 @@ class DBImpl : public DB {
static void TEST_ResetDbSessionIdGen();
static std::string GenerateDbSessionId(Env* env);

public:
// SPDB write
bool CheckIfActionNeeded();
Status RegisterFlushOrTrim();
void SetLastSequence(uint64_t seq_inc) {
versions_->SetLastSequence(seq_inc);
}
uint64_t FetchAddLastAllocatedSequence(uint64_t batch_count) {
return versions_->FetchAddLastAllocatedSequence(batch_count);
}
Status SpdbWrite(const WriteOptions& write_options, WriteBatch* my_batch,
bool disable_memtable);
IOStatus SpdbWriteToWAL(WriteBatch* merged_batch, size_t write_with_wal,
const WriteBatch* to_be_cached_state, bool do_flush,
uint64_t* offset, uint64_t* size);
IOStatus SpdbSyncWAL(uint64_t offset, uint64_t size);

void SuspendSpdbWrites();
void ResumeSpdbWrites();
bool seq_per_batch() const { return seq_per_batch_; }

protected:
Expand Down Expand Up @@ -2685,6 +2705,9 @@ class DBImpl : public DB {

BlobFileCompletionCallback blob_callback_;

// Pointer to Speedb write flow
std::unique_ptr<SpdbWriteImpl> spdb_write_;

// Pointer to WriteBufferManager stalling interface.
std::unique_ptr<StallInterface> wbm_stall_;

Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2033,6 +2033,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
autovector<FlushRequest> flush_reqs;
autovector<uint64_t> memtable_ids_to_wait;
{
SuspendSpdbWrites();
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);

Expand Down Expand Up @@ -2097,6 +2098,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
}
ResumeSpdbWrites();

if (s.ok() && !flush_reqs.empty()) {
for (const auto& req : flush_reqs) {
Expand Down Expand Up @@ -2177,6 +2179,7 @@ Status DBImpl::AtomicFlushMemTables(
FlushRequest flush_req;
autovector<ColumnFamilyData*> cfds;
{
SuspendSpdbWrites();
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);

Expand Down Expand Up @@ -2211,6 +2214,8 @@ Status DBImpl::AtomicFlushMemTables(
break;
}
}
ResumeSpdbWrites();

if (s.ok()) {
AssignAtomicFlushSeq(cfds);
for (auto cfd : cfds) {
Expand Down
7 changes: 7 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (immutable_db_options_.allow_concurrent_memtable_write && spdb_write_) {
// TBD AYELET this is temporary. the handle of transaction in write flow
// needs careful assignment
return SpdbWrite(write_options, my_batch, disable_memtable);
}
assert(!seq_per_batch_ || batch_cnt != 0);

if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
// TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
return Status::NotSupported(
Expand Down
Loading