diff --git a/CMakeLists.txt b/CMakeLists.txt index e5920aef73..2972cdc1fb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -723,6 +723,7 @@ set(SOURCES db/db_impl/compacted_db_impl.cc db/db_impl/db_impl.cc db/db_impl/db_impl_write.cc + db/db_impl/db_spdb_impl_write.cc db/db_impl/db_impl_compaction_flush.cc db/db_impl/db_impl_files.cc db/db_impl/db_impl_open.cc diff --git a/TARGETS b/TARGETS index 210a3b7cd4..1c816b74c6 100644 --- a/TARGETS +++ b/TARGETS @@ -63,6 +63,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/db_impl/db_impl_readonly.cc", "db/db_impl/db_impl_secondary.cc", "db/db_impl/db_impl_write.cc", + "db/db_impl/db_spdb_impl_write.cc", "db/db_info_dumper.cc", "db/db_iter.cc", "db/dbformat.cc", @@ -411,6 +412,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "db/db_impl/db_impl_readonly.cc", "db/db_impl/db_impl_secondary.cc", "db/db_impl/db_impl_write.cc", + "db/db_impl/db_spdb_impl_write.cc", "db/db_info_dumper.cc", "db/db_iter.cc", "db/dbformat.cc", diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 176ffc384d..c0e2f90292 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -293,6 +293,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, if (write_buffer_manager_) { wbm_stall_.reset(new WBMStallInterface()); } + + if (immutable_db_options_.use_spdb_writes) { + spdb_write_.reset(new SpdbWriteImpl(this)); + } } Status DBImpl::Resume() { @@ -562,6 +566,11 @@ Status DBImpl::CloseHelper() { } mutex_.Unlock(); + // Shutdown Spdb write in order to ensure no writes will be handled + if (spdb_write_) { + spdb_write_->Shutdown(); + } + // Below check is added as recovery_error_ is not checked and it causes crash // in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is // reached. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 247d5c3d6d..32f00bffe0 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -23,6 +23,7 @@ #include "db/column_family.h" #include "db/compaction/compaction_iterator.h" #include "db/compaction/compaction_job.h" +#include "db/db_impl/db_spdb_impl_write.h" #include "db/error_handler.h" #include "db/event_helpers.h" #include "db/external_sst_file_ingestion_job.h" @@ -1265,6 +1266,25 @@ class DBImpl : public DB { static void TEST_ResetDbSessionIdGen(); static std::string GenerateDbSessionId(Env* env); + public: + // SPDB write + bool CheckIfActionNeeded(); + Status RegisterFlushOrTrim(); + void SetLastSequence(uint64_t seq_inc) { + versions_->SetLastSequence(seq_inc); + } + uint64_t FetchAddLastAllocatedSequence(uint64_t batch_count) { + return versions_->FetchAddLastAllocatedSequence(batch_count); + } + Status SpdbWrite(const WriteOptions& write_options, WriteBatch* my_batch, + bool disable_memtable); + IOStatus SpdbWriteToWAL(WriteBatch* merged_batch, size_t write_with_wal, + const WriteBatch* to_be_cached_state, bool do_flush, + uint64_t* offset, uint64_t* size); + IOStatus SpdbSyncWAL(uint64_t offset, uint64_t size); + + void SuspendSpdbWrites(); + void ResumeSpdbWrites(); bool seq_per_batch() const { return seq_per_batch_; } protected: @@ -2709,6 +2729,9 @@ class DBImpl : public DB { BlobFileCompletionCallback blob_callback_; + // Pointer to Speedb write flow + std::unique_ptr spdb_write_; + // Pointer to WriteBufferManager stalling interface. std::unique_ptr wbm_stall_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 9f42742983..20e7b16db8 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2097,6 +2097,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, autovector flush_reqs; autovector memtable_ids_to_wait; { + SuspendSpdbWrites(); WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); @@ -2161,6 +2162,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } } } + ResumeSpdbWrites(); if (s.ok() && !flush_reqs.empty()) { for (const auto& req : flush_reqs) { @@ -2279,6 +2281,7 @@ Status DBImpl::AtomicFlushMemTables( FlushRequest flush_req; autovector cfds; { + SuspendSpdbWrites(); WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); @@ -2314,6 +2317,8 @@ Status DBImpl::AtomicFlushMemTables( break; } } + ResumeSpdbWrites(); + if (s.ok()) { AssignAtomicFlushSeq(cfds); for (auto cfd : cfds) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 628a1e618a..4d3f7adfd7 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -229,6 +229,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "pipelined_writes is not compatible with concurrent prepares"); } + if (immutable_db_options_.allow_concurrent_memtable_write && spdb_write_) { + // TBD AYELET this is temporary. the handle of transaction in write flow + // needs careful assignment + return SpdbWrite(write_options, my_batch, disable_memtable); + } + assert(!seq_per_batch_ || batch_cnt != 0); + if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt return Status::NotSupported( diff --git a/db/db_impl/db_spdb_impl_write.cc b/db/db_impl/db_spdb_impl_write.cc new file mode 100644 index 0000000000..b1946f5d16 --- /dev/null +++ b/db/db_impl/db_spdb_impl_write.cc @@ -0,0 +1,493 @@ +// Copyright 2022 SpeeDB Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "db/db_impl/db_spdb_impl_write.h" + +#include "db/db_impl/db_impl.h" +#include "db/write_batch_internal.h" +#include "logging/logging.h" +#include "monitoring/instrumented_mutex.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" +#include "rocksdb/system_clock.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { +#define MAX_ELEMENTS_IN_BATCH_GROUP 16 +// add_buffer_mutex_ is held +bool WritesBatchList::Add(WriteBatch* batch, const WriteOptions& write_options, + bool* leader_batch) { + elements_num_++; + if (elements_num_ == MAX_ELEMENTS_IN_BATCH_GROUP) { + switch_wb_.store(true); + } + const size_t seq_inc = batch->Count(); + max_seq_ = WriteBatchInternal::Sequence(batch) + seq_inc - 1; + + if (!write_options.disableWAL) { + wal_writes_.push_back(batch); + } + if (write_options.sync && wal_writes_.size() != 0) { + need_sync_ = true; + } + if (elements_num_ == 1) { + // first wal batch . should take the buffer_write_rw_lock_ as write + *leader_batch = true; + buffer_write_rw_lock_.WriteLock(); + } + write_ref_rwlock_.ReadLock(); + return switch_wb_.load(); +} + +void WritesBatchList::WriteBatchComplete(bool leader_batch) { + // Batch was added to the memtable, we can release the memtable_ref. + write_ref_rwlock_.ReadUnlock(); + if (leader_batch) { + { + // make sure all batches wrote to memtable (if needed) to be able progress + // the version + WriteLock wl(&write_ref_rwlock_); + } + complete_batch_.store(true); + // wal write has been completed wal waiters will be released + buffer_write_rw_lock_.WriteUnlock(); + } else { + // wait wal write completed + ReadLock rl(&buffer_write_rw_lock_); + } +} + +void WritesBatchList::WaitForPendingWrites() { + // make sure all batches wrote to memtable (ifneeded) to be able progress the + // version + WriteLock wl(&write_ref_rwlock_); +} + +void SpdbWriteImpl::WriteBatchComplete(void* list, bool leader_batch) { + WritesBatchList* wb_list = static_cast(list); + if (leader_batch) { + SwitchAndWriteBatchGroup(wb_list); + } else { + wb_list->WriteBatchComplete(false); + } +} + +void SpdbWriteImpl::SpdbFlushWriteThread() { + for (;;) { + { + std::unique_lock lck(flush_thread_mutex_); + auto duration = std::chrono::seconds(5); + auto cv_status = flush_thread_cv_.wait_for(lck, duration); + + // Check if the wait stopped due to timing out. + if (cv_status != std::cv_status::timeout || + flush_thread_terminate_.load()) { + return; + } + } + if (db_->CheckIfActionNeeded()) { + // make sure no on the fly writes + flush_rwlock_.WriteLock(); + db_->RegisterFlushOrTrim(); + flush_rwlock_.WriteUnlock(); + } + } +} + +SpdbWriteImpl::SpdbWriteImpl(DBImpl* db) + : db_(db), + flush_thread_terminate_(false), + flush_thread_(&SpdbWriteImpl::SpdbFlushWriteThread, this) { +#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) +#if __GLIBC_PREREQ(2, 12) + auto thread_handle = flush_thread_.native_handle(); + pthread_setname_np(thread_handle, "speedb:wflush"); +#endif +#endif + wb_lists_.push_back(std::make_shared()); +} + +SpdbWriteImpl::~SpdbWriteImpl() { + Shutdown(); + flush_thread_.join(); +} + +void SpdbWriteImpl::Shutdown() { + { WriteLock wl(&flush_rwlock_); } + { + std::unique_lock lck(flush_thread_mutex_); + flush_thread_terminate_ = true; + } + flush_thread_cv_.notify_one(); +} + +bool DBImpl::CheckIfActionNeeded() { + InstrumentedMutexLock l(&mutex_); + + if (total_log_size_ > GetMaxTotalWalSize()) { + return true; + } + + if (write_buffer_manager_->ShouldFlush()) { + return true; + } + + if (!flush_scheduler_.Empty()) { + return true; + } + + if (!trim_history_scheduler_.Empty()) { + return true; + } + return false; +} + +Status DBImpl::RegisterFlushOrTrim() { + Status status; + WriteContext write_context; + InstrumentedMutexLock l(&mutex_); + + if (UNLIKELY(status.ok() && total_log_size_ > GetMaxTotalWalSize())) { + status = SwitchWAL(&write_context); + } + + if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { + status = HandleWriteBufferManagerFlush(&write_context); + } + + if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { + status = ScheduleFlushes(&write_context); + } + + if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { + status = TrimMemtableHistory(&write_context); + } + return status; +} + +std::shared_ptr SpdbWriteImpl::Add( + WriteBatch* batch, const WriteOptions& write_options, bool* leader_batch) { + MutexLock l(&add_buffer_mutex_); + std::shared_ptr current_wb = nullptr; + { + MutexLock wb_list_lock(&wb_list_mutex_); + current_wb = wb_lists_.back(); + } + const uint64_t sequence = + db_->FetchAddLastAllocatedSequence(batch->Count()) + 1; + WriteBatchInternal::SetSequence(batch, sequence); + current_wb->Add(batch, write_options, leader_batch); + /*if (need_switch_wb) { + //create new wb + wb_lists_.push_back(std::make_shared()); + }*/ + return current_wb; +} + +std::shared_ptr SpdbWriteImpl::AddMerge( + WriteBatch* batch, const WriteOptions& write_options, bool* leader_batch) { + // thie will be released AFTER ths batch will be written to memtable! + add_buffer_mutex_.Lock(); + std::shared_ptr current_wb = nullptr; + const uint64_t sequence = + db_->FetchAddLastAllocatedSequence(batch->Count()) + 1; + WriteBatchInternal::SetSequence(batch, sequence); + // need to wait all prev batches completed to write to memetable and avoid + // new batches to write to memetable before this one + + { + MutexLock l(&wb_list_mutex_); + for (std::list>::iterator iter = + wb_lists_.begin(); + iter != wb_lists_.end(); ++iter) { + (*iter)->WaitForPendingWrites(); + } + current_wb = wb_lists_.back(); + } + current_wb->Add(batch, write_options, leader_batch); + + return current_wb; +} +// release the add merge lock +void SpdbWriteImpl::CompleteMerge() { add_buffer_mutex_.Unlock(); } + +void SpdbWriteImpl::Lock(bool is_read) { + if (is_read) { + flush_rwlock_.ReadLock(); + } else { + flush_rwlock_.WriteLock(); + } +} + +void SpdbWriteImpl::Unlock(bool is_read) { + if (is_read) { + flush_rwlock_.ReadUnlock(); + } else { + flush_rwlock_.WriteUnlock(); + } +} + +void SpdbWriteImpl::SwitchBatchGroupIfNeeded() { + MutexLock l(&add_buffer_mutex_); + MutexLock wb_list_lock(&wb_list_mutex_); + // create new wb if needed + // if (!wb_list->IsSwitchWBOccur()) { + wb_lists_.push_back(std::make_shared()); + //} +} + +void SpdbWriteImpl::PublishedSeq() { + uint64_t published_seq = 0; + { + MutexLock l(&wb_list_mutex_); + std::list>::iterator iter = + wb_lists_.begin(); + while (iter != wb_lists_.end()) { + if ((*iter)->IsComplete()) { + published_seq = (*iter)->GetMaxSeq(); + iter = wb_lists_.erase(iter); // erase and go to next + } else { + break; + } + } + if (published_seq != 0) { + ROCKS_LOG_INFO(db_->immutable_db_options().info_log, + "PublishedSeq %" PRIu64, published_seq); + + db_->SetLastSequence(published_seq); + } + } +} + +void SpdbWriteImpl::SwitchAndWriteBatchGroup(WritesBatchList* batch_group) { + // take the wal write rw lock from protecting another batch group wal write + IOStatus io_s; + uint64_t offset = 0; + uint64_t size = 0; + // uint64_t start_offset = 0; + // uint64_t total_size = 0; + + wal_write_mutex_.Lock(); + SwitchBatchGroupIfNeeded(); + ROCKS_LOG_INFO(db_->immutable_db_options().info_log, + "SwitchBatchGroup last batch group with %d batches and with " + "publish seq %" PRIu64, + batch_group->elements_num_, batch_group->GetMaxSeq()); + + if (!batch_group->wal_writes_.empty()) { + auto const& immutable_db_options = db_->immutable_db_options(); + StopWatch write_sw(immutable_db_options.clock, immutable_db_options.stats, + DB_WAL_WRITE_TIME); + + const WriteBatch* to_be_cached_state = nullptr; + if (batch_group->wal_writes_.size() == 1 && + batch_group->wal_writes_.front() + ->GetWalTerminationPoint() + .is_cleared()) { + WriteBatch* wal_batch = batch_group->wal_writes_.front(); + + if (WriteBatchInternal::IsLatestPersistentState(wal_batch)) { + to_be_cached_state = wal_batch; + } + io_s = db_->SpdbWriteToWAL(wal_batch, 1, to_be_cached_state, + batch_group->need_sync_, &offset, &size); + } else { + uint64_t progress_batch_seq; + size_t wal_writes = 0; + WriteBatch* merged_batch = &tmp_batch_; + for (const WriteBatch* batch : batch_group->wal_writes_) { + if (wal_writes != 0 && + (progress_batch_seq != WriteBatchInternal::Sequence(batch))) { + // this can happened if we have a batch group that consists no wal + // writes... need to divide the wal writes when the seq is broken + io_s = + db_->SpdbWriteToWAL(merged_batch, wal_writes, to_be_cached_state, + batch_group->need_sync_, &offset, &size); + // reset counter and state + tmp_batch_.Clear(); + wal_writes = 0; + to_be_cached_state = nullptr; + if (!io_s.ok()) { + // TBD what todo with error + break; + } + } + if (wal_writes == 0) { + // first batch seq to use when we will replay the wal after recovery + WriteBatchInternal::SetSequence(merged_batch, + WriteBatchInternal::Sequence(batch)); + } + // to be able knowing the batch are in seq order + progress_batch_seq = + WriteBatchInternal::Sequence(batch) + batch->Count(); + Status s = WriteBatchInternal::Append(merged_batch, batch, true); + // Always returns Status::OK.() + if (!s.ok()) { + assert(false); + } + if (WriteBatchInternal::IsLatestPersistentState(batch)) { + // We only need to cache the last of such write batch + to_be_cached_state = batch; + } + ++wal_writes; + } + if (wal_writes) { + io_s = db_->SpdbWriteToWAL(merged_batch, wal_writes, to_be_cached_state, + batch_group->need_sync_, &offset, &size); + tmp_batch_.Clear(); + } + } + } + wal_write_mutex_.Unlock(); + if (!io_s.ok()) { + // TBD what todo with error + ROCKS_LOG_ERROR(db_->immutable_db_options().info_log, + "Error write to wal!!! %s", io_s.ToString().c_str()); + } + + if (batch_group->need_sync_) { + db_->SpdbSyncWAL(offset, size); + } + + batch_group->WriteBatchComplete(true); + ROCKS_LOG_INFO(db_->immutable_db_options().info_log, + "Complete batch group with publish seq %" PRIu64, + batch_group->GetMaxSeq()); + + PublishedSeq(); +} + +Status DBImpl::SpdbWrite(const WriteOptions& write_options, WriteBatch* batch, + bool disable_memtable) { + assert(batch != nullptr && WriteBatchInternal::Count(batch) > 0); + StopWatch write_sw(immutable_db_options_.clock, immutable_db_options_.stats, + DB_WRITE); + + if (error_handler_.IsDBStopped()) { + return error_handler_.GetBGError(); + } + + last_batch_group_size_ = WriteBatchInternal::ByteSize(batch); + spdb_write_->Lock(true); + + if (write_options.disableWAL) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + + Status status; + bool leader_batch = false; + std::shared_ptr list; + if (batch->HasMerge()) { + // need to wait all prev batches completed to write to memetable and avoid + // new batches to write to memetable before this one + list = spdb_write_->AddMerge(batch, write_options, &leader_batch); + } else { + list = spdb_write_->Add(batch, write_options, &leader_batch); + } + + if (!disable_memtable) { + bool concurrent_memtable_writes = !batch->HasMerge(); + status = WriteBatchInternal::InsertInto( + batch, column_family_memtables_.get(), &flush_scheduler_, + &trim_history_scheduler_, write_options.ignore_missing_column_families, + 0 /*recovery_log_number*/, this, concurrent_memtable_writes, nullptr, + nullptr, seq_per_batch_, batch_per_txn_); + } + + if (batch->HasMerge()) { + spdb_write_->CompleteMerge(); + } + + // handle !status.ok() + spdb_write_->WriteBatchComplete(list.get(), leader_batch); + spdb_write_->Unlock(true); + + return status; +} + +void DBImpl::SuspendSpdbWrites() { + if (spdb_write_) { + spdb_write_->Lock(false); + } +} +void DBImpl::ResumeSpdbWrites() { + if (spdb_write_) { + // must release the db mutex lock before unlock spdb flush lock + // to prevent deadlock!!! the db mutex will be acquired after the unlock + mutex_.Unlock(); + spdb_write_->Unlock(false); + // Lock again the db mutex as it was before we enterd this function + mutex_.Lock(); + } +} + +IOStatus DBImpl::SpdbSyncWAL(uint64_t offset, uint64_t size) { + IOStatus io_s; + StopWatch sw(immutable_db_options_.clock, stats_, WAL_FILE_SYNC_MICROS); + { + InstrumentedMutexLock l(&log_write_mutex_); + log::Writer* log_writer = logs_.back().writer; + io_s = log_writer->SyncRange(immutable_db_options_.use_fsync, offset, size); + ROCKS_LOG_INFO(immutable_db_options().info_log, + "Complete SyncRange offset %" PRIu64 " size %" PRIu64, + offset, size); + } + if (io_s.ok() && !log_dir_synced_) { + io_s = directories_.GetWalDir()->FsyncWithDirOptions( + IOOptions(), nullptr, + DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); + log_dir_synced_ = true; + ROCKS_LOG_INFO(immutable_db_options().info_log, "Complete Sync dir"); + } + return io_s; +} +IOStatus DBImpl::SpdbWriteToWAL(WriteBatch* merged_batch, size_t write_with_wal, + const WriteBatch* to_be_cached_state, + bool do_flush, uint64_t* offset, + uint64_t* size) { + assert(merged_batch != nullptr || write_with_wal == 0); + IOStatus io_s; + + const Slice log_entry = WriteBatchInternal::Contents(merged_batch); + const uint64_t log_entry_size = log_entry.size(); + { + InstrumentedMutexLock l(&log_write_mutex_); + log::Writer* log_writer = logs_.back().writer; + io_s = log_writer->AddRecordWithStartOffsetAndSize(log_entry, Env::IO_TOTAL, + do_flush, offset, size); + } + + total_log_size_ += log_entry_size; + // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() + // here since alive_log_files_ might be modified concurrently + alive_log_files_.back().AddSize(log_entry_size); + log_empty_ = false; + + if (to_be_cached_state != nullptr) { + cached_recoverable_state_ = *to_be_cached_state; + cached_recoverable_state_empty_ = false; + } + + if (io_s.ok()) { + InternalStats* stats = default_cf_internal_stats_; + + stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_entry_size); + RecordTick(stats_, WAL_FILE_BYTES, log_entry_size); + stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal); + RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); + } + + return io_s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_spdb_impl_write.h b/db/db_impl/db_spdb_impl_write.h new file mode 100644 index 0000000000..c592e34961 --- /dev/null +++ b/db/db_impl/db_spdb_impl_write.h @@ -0,0 +1,102 @@ +// Copyright 2022 SpeeDB Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "port/port.h" +#include "rocksdb/write_batch.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +class DBImpl; + +struct WritesBatchList { + std::list wal_writes_; + uint16_t elements_num_ = 0; + uint64_t max_seq_ = 0; + port::RWMutexWr buffer_write_rw_lock_; + port::RWMutexWr write_ref_rwlock_; + std::atomic need_sync_ = false; + std::atomic switch_wb_ = false; + std::atomic complete_batch_ = false; + void Clear() { + wal_writes_.clear(); + elements_num_ = 0; + max_seq_ = 0; + need_sync_ = false; + switch_wb_ = false; + complete_batch_ = false; + } + + public: + bool Add(WriteBatch* batch, const WriteOptions& write_options, + bool* leader_batch); + uint64_t GetMaxSeq() const { return max_seq_; } + void WaitForPendingWrites(); + bool IsSwitchWBOccur() const { return switch_wb_.load(); } + bool IsComplete() const { return complete_batch_.load(); } + void WriteBatchComplete(bool leader_batch); +}; + +class SpdbWriteImpl { + public: + SpdbWriteImpl(DBImpl* db); + + ~SpdbWriteImpl(); + void SpdbFlushWriteThread(); + + std::shared_ptr Add(WriteBatch* batch, + const WriteOptions& write_options, + bool* leader_batch); + std::shared_ptr AddMerge(WriteBatch* batch, + const WriteOptions& write_options, + bool* leader_batch); + void CompleteMerge(); + void Shutdown(); + void WaitForWalWriteComplete(void* list); + void WriteBatchComplete(void* list, bool leader_batch); + port::RWMutexWr& GetFlushRWLock() { return flush_rwlock_; } + void Lock(bool is_read); + void Unlock(bool is_read); + + public: + void SwitchAndWriteBatchGroup(WritesBatchList* wb_list); + void SwitchBatchGroupIfNeeded(); + void PublishedSeq(); + + std::atomic last_wal_write_seq_{0}; + + std::list> wb_lists_; + DBImpl* db_; + std::atomic flush_thread_terminate_; + std::mutex flush_thread_mutex_; + std::condition_variable flush_thread_cv_; + port::Mutex add_buffer_mutex_; + port::RWMutexWr flush_rwlock_; + std::thread flush_thread_; + port::RWMutexWr wal_buffers_rwlock_; + port::Mutex wal_write_mutex_; + port::Mutex wb_list_mutex_; + + WriteBatch tmp_batch_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/log_writer.cc b/db/log_writer.cc index 56f58543e9..2dd4a702f8 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -62,7 +62,8 @@ IOStatus Writer::Close() { } IOStatus Writer::AddRecord(const Slice& slice, - Env::IOPriority rate_limiter_priority) { + Env::IOPriority rate_limiter_priority, + bool /*do_flush*/) { const char* ptr = slice.data(); size_t left = slice.size(); @@ -149,7 +150,7 @@ IOStatus Writer::AddRecord(const Slice& slice, } while (s.ok() && (left > 0 || compress_remaining > 0)); if (s.ok()) { - if (!manual_flush_) { + if (!manual_flush_ /*&& do_flush*/) { s = dest_->Flush(rate_limiter_priority); } } @@ -157,6 +158,26 @@ IOStatus Writer::AddRecord(const Slice& slice, return s; } +IOStatus Writer::AddRecordWithStartOffsetAndSize( + const Slice& slice, Env::IOPriority rate_limiter_priority, bool do_flush, + uint64_t* offset, uint64_t* size) { + IOStatus s; + *offset = dest_->GetFileSize(); + s = AddRecord(slice, rate_limiter_priority, do_flush); + *size = dest_->GetFileSize() - *offset + 1; + return s; +} + +IOStatus Writer::SyncRange(bool use_fsync, uint64_t offset, uint64_t size) { + IOStatus s; + if (!manual_flush_) { + s = dest_->RangeSync(offset, size); + } else { + s = dest_->Sync(use_fsync); + } + return s; +} + IOStatus Writer::AddCompressionTypeRecord() { // Should be the first record assert(block_offset_ == 0); diff --git a/db/log_writer.h b/db/log_writer.h index 5d266e4343..391ddbec25 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -84,7 +84,14 @@ class Writer { ~Writer(); IOStatus AddRecord(const Slice& slice, - Env::IOPriority rate_limiter_priority = Env::IO_TOTAL); + Env::IOPriority rate_limiter_priority = Env::IO_TOTAL, + bool do_flush = true); + IOStatus AddRecordWithStartOffsetAndSize( + const Slice& slice, Env::IOPriority rate_limiter_priority = Env::IO_TOTAL, + bool do_flush = true, uint64_t* offset = nullptr, + uint64_t* size = nullptr); + + IOStatus SyncRange(bool use_fsync, uint64_t offset, uint64_t size); IOStatus AddCompressionTypeRecord(); WritableFileWriter* file() { return dest_.get(); } diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 199bb65e8c..ae24ccbdfe 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -158,6 +158,7 @@ DECLARE_uint64(compaction_ttl); DECLARE_bool(fifo_allow_compaction); DECLARE_bool(allow_concurrent_memtable_write); DECLARE_double(experimental_mempurge_threshold); +DECLARE_bool(use_spdb_writes); DECLARE_bool(enable_write_thread_adaptive_yield); DECLARE_int32(reopen); DECLARE_string(filter_uri); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index a413e364d4..82bc68b3d5 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -398,6 +398,8 @@ DEFINE_bool(fifo_allow_compaction, false, "If true, set `Options::compaction_options_fifo.allow_compaction = " "true`. It only take effect when FIFO compaction is used."); +DEFINE_bool(use_spdb_writes, false, "Use optimized Speedb write flow"); + DEFINE_bool(allow_concurrent_memtable_write, false, "Allow multi-writers to update mem tables in parallel."); diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index aac0f59491..86e6eb8db5 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -282,6 +282,7 @@ class WritableFileWriter { std::string GetFileChecksum(); const char* GetFileChecksumFuncName() const; + IOStatus RangeSync(uint64_t offset, uint64_t nbytes); bool seen_error() const { return seen_error_.load(std::memory_order_relaxed); @@ -314,7 +315,6 @@ class WritableFileWriter { Env::IOPriority op_rate_limiter_priority); IOStatus WriteBufferedWithChecksum(const char* data, size_t size, Env::IOPriority op_rate_limiter_priority); - IOStatus RangeSync(uint64_t offset, uint64_t nbytes); IOStatus SyncInternal(bool use_fsync); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8d0d6ed1d1..28fcddb2cd 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1118,6 +1118,15 @@ struct DBOptions { // Default: true bool allow_concurrent_memtable_write = true; + // If true, uses an optimized write path that pipelines writes better in the + // presence of multiple writers. Only some memtable_factory-s would really + // benefit from this write flow, as it requires support for fast concurrent + // insertion in order to be effective. + // This is an experimental feature. + // + // Default: false + bool use_spdb_writes = false; + // If true, threads synchronizing with the write batch group leader will // wait for up to write_thread_max_yield_usec before blocking on a mutex. // This can substantially improve throughput for concurrent workloads, diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c10c679190..6632f9d107 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -542,6 +542,11 @@ enum Histograms : uint32_t { // Wait time for aborting async read in FilePrefetchBuffer destructor ASYNC_PREFETCH_ABORT_MICROS, + DB_GET_MEMTABLE, + DB_WAL_WRITE_TIME, + DB_WRITE_WAIT_FOR_WAL, + DB_WRITE_WAIT_FOR_WAL_WITH_MUTEX, + // Number of bytes read for RocksDB's prefetching contents (as opposed to file // system's prefetch) from the end of SST table during block based table open TABLE_OPEN_PREFETCH_TAIL_READ_BYTES, diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 206372c7c7..4937e39da3 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -275,6 +275,10 @@ const std::vector> HistogramsNameMap = { {MULTIGET_IO_BATCH_SIZE, "rocksdb.multiget.io.batch.size"}, {NUM_LEVEL_READ_PER_MULTIGET, "rocksdb.num.level.read.per.multiget"}, {ASYNC_PREFETCH_ABORT_MICROS, "rocksdb.async.prefetch.abort.micros"}, + {DB_GET_MEMTABLE, "rocksdb.db.get.mem.micros"}, + {DB_WAL_WRITE_TIME, "rocksdb.db.wal.write.micros"}, + {DB_WRITE_WAIT_FOR_WAL, "rocksdb.db.write_wait_for_wal.micros"}, + {DB_WRITE_WAIT_FOR_WAL_WITH_MUTEX, "rocksdb.db.write_wait_mutex.micros"}, {TABLE_OPEN_PREFETCH_TAIL_READ_BYTES, "rocksdb.table.open.prefetch.tail.read.bytes"}, }; diff --git a/options/db_options.cc b/options/db_options.cc index 55fb631a55..2b3020c931 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -336,6 +336,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, allow_concurrent_memtable_write), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"use_spdb_writes", + {offsetof(struct ImmutableDBOptions, use_spdb_writes), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"wal_recovery_mode", OptionTypeInfo::Enum( offsetof(struct ImmutableDBOptions, wal_recovery_mode), @@ -737,6 +741,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) enable_pipelined_write(options.enable_pipelined_write), unordered_write(options.unordered_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), + use_spdb_writes(options.use_spdb_writes), enable_write_thread_adaptive_yield( options.enable_write_thread_adaptive_yield), write_thread_max_yield_usec(options.write_thread_max_yield_usec), @@ -891,6 +896,7 @@ void ImmutableDBOptions::Dump(Logger* log) const { unordered_write); ROCKS_LOG_HEADER(log, " Options.allow_concurrent_memtable_write: %d", allow_concurrent_memtable_write); + ROCKS_LOG_HEADER(log, " Options.use_spdb_writes: %d", use_spdb_writes); ROCKS_LOG_HEADER(log, " Options.enable_write_thread_adaptive_yield: %d", enable_write_thread_adaptive_yield); ROCKS_LOG_HEADER(log, diff --git a/options/db_options.h b/options/db_options.h index 063aada6ea..5c1fdfe120 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -69,6 +69,7 @@ struct ImmutableDBOptions { bool enable_pipelined_write; bool unordered_write; bool allow_concurrent_memtable_write; + bool use_spdb_writes; bool enable_write_thread_adaptive_yield; uint64_t write_thread_max_yield_usec; uint64_t write_thread_slow_yield_usec; diff --git a/src.mk b/src.mk index 264913c4e5..bba818e1e1 100644 --- a/src.mk +++ b/src.mk @@ -54,6 +54,7 @@ LIB_SOURCES = \ db/db_impl/db_impl_readonly.cc \ db/db_impl/db_impl_secondary.cc \ db/db_impl/db_impl_write.cc \ + db/db_impl/db_spdb_impl_write.cc \ db/db_info_dumper.cc \ db/db_iter.cc \ db/dbformat.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 22de6ad999..2014163422 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1580,6 +1580,7 @@ DEFINE_double(experimental_mempurge_threshold, ROCKSDB_NAMESPACE::Options().experimental_mempurge_threshold, "Maximum useful payload ratio estimate that triggers a mempurge " "(memtable garbage collection)."); +DEFINE_bool(use_spdb_writes, true, "Use optimized Speedb write flow"); DEFINE_bool(inplace_update_support, ROCKSDB_NAMESPACE::Options().inplace_update_support, @@ -4822,6 +4823,7 @@ class Benchmark { FLAGS_allow_concurrent_memtable_write; options.experimental_mempurge_threshold = FLAGS_experimental_mempurge_threshold; + options.use_spdb_writes = FLAGS_use_spdb_writes; options.inplace_update_support = FLAGS_inplace_update_support; options.inplace_update_num_locks = FLAGS_inplace_update_num_locks; options.enable_write_thread_adaptive_yield =