From b4b5c1fe5e1718b1e3818c10a8d1dba6b349d189 Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Mon, 19 Dec 2022 22:18:19 +0200 Subject: [PATCH] Dynamic Delay: new feature - Dynamic Delay (#276) Calculate the delayed write rate based on the current state of the CF based on the value of delayed_write_rate that the user initially set. The calculation reduces speed linearly with regards to how much the CF has exceeded the slowdown conditions (L0 files and pending bytes). The slowdown for writing to the last memtable (when theres more than 3) is 10 fold --- db/column_family.cc | 176 +++++++-- db/column_family.h | 18 + db/column_family_test.cc | 523 +++++++++++++++----------- db/compaction/compaction_job_test.cc | 1 + db/db_impl/db_impl.cc | 3 +- db/db_options_test.cc | 1 + db/db_test.cc | 1 + db/db_wal_test.cc | 2 +- db/flush_job_test.cc | 1 + db/memtable_list_test.cc | 6 +- db/repair.cc | 2 +- db/version_set.cc | 2 +- db/version_set_test.cc | 1 + db/version_util.h | 2 +- db/wal_manager_test.cc | 1 + db/write_controller.h | 8 +- db/write_controller_test.cc | 20 +- db_stress_tool/db_stress_common.h | 1 + db_stress_tool/db_stress_gflags.cc | 3 + db_stress_tool/db_stress_test_base.cc | 1 + include/rocksdb/options.h | 13 + options/db_options.cc | 7 + options/db_options.h | 1 + options/options_helper.cc | 1 + options/options_settable_test.cc | 3 +- tools/db_bench_tool.cc | 4 + tools/db_crashtest.py | 1 + tools/ldb_cmd.cc | 6 +- tools/ldb_cmd_test.cc | 2 +- 29 files changed, 544 insertions(+), 267 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 2bc4d397eb..49586cded9 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -881,6 +881,39 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, } } // namespace +namespace { +const uint64_t Gb = 1ull << 30; +const uint64_t Mb = 1ull << 20; +const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s. +const int kMemtablePenalty = 10; +} // namespace + +double ColumnFamilyData::TEST_CalculateWriteDelayDivider( + uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause) { + return CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( + compaction_needed_bytes, mutable_cf_options, write_stall_cause); +} + +std::unique_ptr ColumnFamilyData::DynamicSetupDelay( + WriteController* write_controller, uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause) { + uint64_t max_write_rate = write_controller->max_delayed_write_rate(); + + const double rate_divider = + CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( + compaction_needed_bytes, mutable_cf_options, write_stall_cause); + assert(rate_divider >= 1); + auto write_rate = static_cast(max_write_rate / rate_divider); + if (write_rate < kMinWriteRate) { + write_rate = kMinWriteRate; + } + + return write_controller->GetDelayToken(write_rate); +} + std::pair ColumnFamilyData::GetWriteStallConditionAndCause( int num_unflushed_memtables, int num_l0_files, @@ -919,6 +952,86 @@ ColumnFamilyData::GetWriteStallConditionAndCause( return {WriteStallCondition::kNormal, WriteStallCause::kNone}; } +// Delay divider is by how much we divide the users delayed_write_rate. +// E.g. divider 10 will result in 10 Mb/s from users 100 Mb/s. +// The rate is reduced linearly according to the range from slowdown to stop. +double +ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( + uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + ColumnFamilyData::WriteStallCause& write_stall_cause) { + assert(current_ != nullptr); + + const auto* vstorage = current_->storage_info(); + + // Memtables + // this can only be entered when we're at the last memtable and theres more + // than 3. delay by 10X when writing to the last memtable. + double memtable_divider = 1; + auto num_unflushed_memtables = imm()->NumNotFlushed(); + if (mutable_cf_options.max_write_buffer_number > 3 && + num_unflushed_memtables >= + mutable_cf_options.max_write_buffer_number - 1 && + num_unflushed_memtables - 1 >= + ioptions_.min_write_buffer_number_to_merge) { + memtable_divider = kMemtablePenalty; + } + + // Pending Compaction Bytes + double pending_divider = 1; + auto soft_limit = mutable_cf_options.soft_pending_compaction_bytes_limit; + if (soft_limit > 0 && compaction_needed_bytes >= soft_limit) { + auto hard_limit = mutable_cf_options.hard_pending_compaction_bytes_limit; + // soft_limit != hard_limit here. we're in a kDelayed state here and not + // stop. + assert(hard_limit > soft_limit); + uint64_t soft_hard_range = hard_limit - soft_limit; + // change rate every 1G change or 100Mb if soft_hard_range is too small. + auto step_size = soft_hard_range > Gb ? Gb : 100 * Mb; + uint64_t num_steps = soft_hard_range / step_size; + auto extra_bytes = compaction_needed_bytes - soft_limit; + auto step_num = static_cast(extra_bytes / step_size); + assert(step_num < num_steps); + if (num_steps > 0) { + pending_divider = 1 / (1 - (static_cast(step_num) / num_steps)); + } + } + + double biggest_divider = 1; + if (memtable_divider > pending_divider) { + biggest_divider = memtable_divider; + write_stall_cause = WriteStallCause::kMemtableLimit; + } else if (pending_divider > 1) { + biggest_divider = pending_divider; + write_stall_cause = WriteStallCause::kPendingCompactionBytes; + } + + // dont delay based on L0 when the user disables auto compactions + if (mutable_cf_options.disable_auto_compactions) { + return biggest_divider; + } + + // L0 files + double l0_divider = 1; + const auto extra_l0_ssts = vstorage->NumLevelFiles(0) - + mutable_cf_options.level0_slowdown_writes_trigger; + if (extra_l0_ssts > 0) { + const auto num_L0_steps = mutable_cf_options.level0_stop_writes_trigger - + mutable_cf_options.level0_slowdown_writes_trigger; + assert(num_L0_steps > 0); + // since extra_l0_ssts == num_L0_steps then we're in a stop condition. + assert(extra_l0_ssts < num_L0_steps); + l0_divider = 1 / (1 - (extra_l0_ssts / num_L0_steps)); + } + + if (l0_divider > biggest_divider) { + biggest_divider = l0_divider; + write_stall_cause = WriteStallCause::kL0FileCountLimit; + } + + return biggest_divider; +} + WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { auto write_stall_condition = WriteStallCondition::kNormal; @@ -937,6 +1050,18 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( bool was_stopped = write_controller->IsStopped(); bool needed_delay = write_controller->NeedsDelay(); + bool dynamic_delay = write_controller->is_dynamic_delay(); + + // GetWriteStallConditionAndCause returns the first condition met, so its + // possible that a later condition will require a harder rate limiting. + // calculate all conditions with DynamicSetupDelay and reavaluate the + // write_stall_cause. this is only relevant in the kDelayed case. + if (dynamic_delay && + write_stall_condition == WriteStallCondition::kDelayed) { + write_controller_token_ = + DynamicSetupDelay(write_controller, compaction_needed_bytes, + mutable_cf_options, write_stall_cause); + } if (write_stall_condition == WriteStallCondition::kStopped && write_stall_cause == WriteStallCause::kMemtableLimit) { @@ -971,10 +1096,12 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), compaction_needed_bytes); } else if (write_stall_condition == WriteStallCondition::kDelayed && write_stall_cause == WriteStallCause::kMemtableLimit) { - write_controller_token_ = - SetupDelay(write_controller, compaction_needed_bytes, - prev_compaction_needed_bytes_, was_stopped, - mutable_cf_options.disable_auto_compactions); + if (!dynamic_delay) { + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped, + mutable_cf_options.disable_auto_compactions); + } internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1); ROCKS_LOG_WARN( ioptions_.logger, @@ -986,13 +1113,15 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && write_stall_cause == WriteStallCause::kL0FileCountLimit) { - // L0 is the last two files from stopping. - bool near_stop = vstorage->l0_delay_trigger_count() >= - mutable_cf_options.level0_stop_writes_trigger - 2; - write_controller_token_ = - SetupDelay(write_controller, compaction_needed_bytes, - prev_compaction_needed_bytes_, was_stopped || near_stop, - mutable_cf_options.disable_auto_compactions); + if (!dynamic_delay) { + // L0 is the last two files from stopping. + bool near_stop = vstorage->l0_delay_trigger_count() >= + mutable_cf_options.level0_stop_writes_trigger - 2; + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped || near_stop, + mutable_cf_options.disable_auto_compactions); + } internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { @@ -1009,18 +1138,21 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( // If the distance to hard limit is less than 1/4 of the gap between soft // and // hard bytes limit, we think it is near stop and speed up the slowdown. - bool near_stop = - mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && - (compaction_needed_bytes - - mutable_cf_options.soft_pending_compaction_bytes_limit) > - 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit - - mutable_cf_options.soft_pending_compaction_bytes_limit) / - 4; + if (!dynamic_delay) { + bool near_stop = + mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && + (compaction_needed_bytes - + mutable_cf_options.soft_pending_compaction_bytes_limit) > + 3 * + (mutable_cf_options.hard_pending_compaction_bytes_limit - + mutable_cf_options.soft_pending_compaction_bytes_limit) / + 4; - write_controller_token_ = - SetupDelay(write_controller, compaction_needed_bytes, - prev_compaction_needed_bytes_, was_stopped || near_stop, - mutable_cf_options.disable_auto_compactions); + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped || near_stop, + mutable_cf_options.disable_auto_compactions); + } internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1); ROCKS_LOG_WARN( diff --git a/db/column_family.h b/db/column_family.h index 91a8253742..fc856610d1 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -485,6 +485,24 @@ class ColumnFamilyData { WriteStallCondition RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options); + // REQUIREMENT: db mutex must be held + double TEST_CalculateWriteDelayDivider( + uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause); + + private: + std::unique_ptr DynamicSetupDelay( + WriteController* write_controller, uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause); + + double CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( + uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause); + + public: void set_initialized() { initialized_.store(true); } bool initialized() const { return initialized_.load(); } diff --git a/db/column_family_test.cc b/db/column_family_test.cc index b13038ab1a..6fa33c1607 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -531,6 +531,20 @@ class ColumnFamilyTestBase : public testing::Test { dbfull()-> TEST_UnlockMutex(); } + double CalculateWriteDelayDivider( + ColumnFamilyData* cfd, uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options) { + // add lock to guard current_ (*Version) + ROCKSDB_NAMESPACE::ColumnFamilyData::WriteStallCause write_stall_cause = + ROCKSDB_NAMESPACE::ColumnFamilyData::WriteStallCause::kNone; + + dbfull()->TEST_LockMutex(); + double divider = cfd->TEST_CalculateWriteDelayDivider( + compaction_needed_bytes, mutable_cf_options, write_stall_cause); + dbfull()->TEST_UnlockMutex(); + return divider; + } + std::vector handles_; std::vector names_; std::vector> keys_; @@ -556,6 +570,67 @@ INSTANTIATE_TEST_CASE_P(FormatDef, ColumnFamilyTest, INSTANTIATE_TEST_CASE_P(FormatLatest, ColumnFamilyTest, testing::Values(kLatestFormatVersion)); +#define CALL_WRAPPER(func) \ + func; \ + ASSERT_FALSE(HasFailure()); + +// The params for this suite are the Format Version and whether +// use_dynamic_delay is used +class ColumnFamilyTestWithDynamic + : public ColumnFamilyTestBase, + virtual public ::testing::WithParamInterface> { + public: + ColumnFamilyTestWithDynamic() + : ColumnFamilyTestBase(std::get<0>(GetParam())) {} + + double SetDelayAndCalculateRate(ColumnFamilyData* cfd, + uint64_t pending_bytes_to_set, + int times_delayed, + const MutableCFOptions& mutable_cf_options, + bool expected_is_db_write_stopped, + bool expected_needs_delay, int l0_files = 0) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + vstorage->TEST_set_estimated_compaction_needed_bytes(pending_bytes_to_set); + if (l0_files > 0) { + vstorage->set_l0_delay_trigger_count(l0_files); + } + RecalculateWriteStallConditions(cfd, mutable_cf_options); + + CheckAssertions(expected_is_db_write_stopped, expected_needs_delay); + + double rate_divider = 0; + if (db_options_.use_dynamic_delay && expected_needs_delay) { + rate_divider = CalculateWriteDelayDivider( + cfd, vstorage->estimated_compaction_needed_bytes(), + mutable_cf_options); + } else { + rate_divider = 1; + for (int i = 0; i < times_delayed; i++) { + // each time SetupDelay is called the rate is divided by + // kIncSlowdownRatio (0.8) + rate_divider *= 1.25; + } + } + return rate_divider; + } + + void CheckAssertions(bool expected_is_db_write_stopped, + bool expected_needs_delay) { + ASSERT_TRUE(IsDbWriteStopped() == expected_is_db_write_stopped); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay() == + expected_needs_delay); + } +}; + +INSTANTIATE_TEST_CASE_P( + FormatDef, ColumnFamilyTestWithDynamic, + testing::Combine(testing::Values(test::kDefaultFormatVersion), + testing::Bool())); + +INSTANTIATE_TEST_CASE_P(FormatLatest, ColumnFamilyTestWithDynamic, + testing::Combine(testing::Values(kLatestFormatVersion), + testing::Bool())); + TEST_P(ColumnFamilyTest, DontReuseColumnFamilyID) { for (int iter = 0; iter < 3; ++iter) { Open(); @@ -2736,11 +2811,15 @@ TEST_P(ColumnFamilyTest, CreateAndDropRace) { } #endif // !ROCKSDB_LITE -TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) { +namespace { +#define Gb *1073741824ull +} // namespace + +TEST_P(ColumnFamilyTestWithDynamic, WriteStallSingleColumnFamily) { const uint64_t kBaseRate = 800000u; db_options_.delayed_write_rate = kBaseRate; db_options_.max_background_compactions = 6; - + db_options_.use_dynamic_delay = std::get<1>(GetParam()); Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); @@ -2751,135 +2830,133 @@ TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) { mutable_cf_options.level0_slowdown_writes_trigger = 20; mutable_cf_options.level0_stop_writes_trigger = 10000; - mutable_cf_options.soft_pending_compaction_bytes_limit = 200; - mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200 Gb; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000 Gb; mutable_cf_options.disable_auto_compactions = false; - - vstorage->TEST_set_estimated_compaction_needed_bytes(50); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(201); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); + bool Stopped = true; + bool NotStopped = false; + bool Delayed = true; + bool NotDelayed = false; + double rate_divider; + + CALL_WRAPPER(SetDelayAndCalculateRate(cfd, 50 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, + NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 201 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(400); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 400 Gb, 1 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(500); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(450); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(205); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(202); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(201); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(198); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(399); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(599); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(2001); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 500 Gb, 2 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 450 Gb, 1 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 205 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 202 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 201 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 198 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 399 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 599 Gb, 1 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 2001 Gb, 0 /* times_delayed*/, + mutable_cf_options, Stopped, NotDelayed)); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(3001); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(390); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(100); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage->set_l0_delay_trigger_count(100); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 3001 Gb, 0 /* times_delayed*/, + mutable_cf_options, Stopped, NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 390 Gb, 1 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 100 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, NotDelayed)); + + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 100 Gb, 0 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 100 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->set_l0_delay_trigger_count(101); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->set_l0_delay_trigger_count(0); - vstorage->TEST_set_estimated_compaction_needed_bytes(300); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage->set_l0_delay_trigger_count(101); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(200); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 100 Gb, 1 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 101 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 300 Gb, 2 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 0 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 300 Gb, 3 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 101 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 200 Gb, 2 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); vstorage->set_l0_delay_trigger_count(0); - vstorage->TEST_set_estimated_compaction_needed_bytes(0); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 0 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, NotDelayed)); mutable_cf_options.disable_auto_compactions = true; dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate); @@ -2887,39 +2964,37 @@ TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - vstorage->set_l0_delay_trigger_count(50); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 0 Gb, 0 /* times_delayed*/, mutable_cf_options, NotStopped, + NotDelayed, 50 /* l0_files*/)); ASSERT_EQ(0, GetDbDelayedWriteRate()); - ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + dbfull()->TEST_write_controler().delayed_write_rate()); - vstorage->set_l0_delay_trigger_count(60); - vstorage->TEST_set_estimated_compaction_needed_bytes(300); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 300 Gb, 0 /* times_delayed*/, mutable_cf_options, NotStopped, + NotDelayed, 60 /* l0_files*/)); ASSERT_EQ(0, GetDbDelayedWriteRate()); - ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + dbfull()->TEST_write_controler().delayed_write_rate()); mutable_cf_options.disable_auto_compactions = false; - vstorage->set_l0_delay_trigger_count(70); - vstorage->TEST_set_estimated_compaction_needed_bytes(500); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->set_l0_delay_trigger_count(71); - vstorage->TEST_set_estimated_compaction_needed_bytes(501); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 500 Gb, 0 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 70 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 501 Gb, 1 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 71 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); } -TEST_P(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { +TEST_P(ColumnFamilyTestWithDynamic, CompactionSpeedupSingleColumnFamily) { db_options_.max_background_compactions = 6; + db_options_.use_dynamic_delay = std::get<1>(GetParam()); Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); @@ -2933,22 +3008,22 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { mutable_cf_options.level0_slowdown_writes_trigger = 36; mutable_cf_options.level0_stop_writes_trigger = 50; // Speedup threshold = 200 / 4 = 50 - mutable_cf_options.soft_pending_compaction_bytes_limit = 200; - mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200 Gb; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000 Gb; - vstorage->TEST_set_estimated_compaction_needed_bytes(40); + vstorage->TEST_set_estimated_compaction_needed_bytes(40 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(50); + vstorage->TEST_set_estimated_compaction_needed_bytes(50 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(300); + vstorage->TEST_set_estimated_compaction_needed_bytes(300 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(45); + vstorage->TEST_set_estimated_compaction_needed_bytes(45 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); @@ -2982,85 +3057,87 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); } -TEST_P(ColumnFamilyTest, WriteStallTwoColumnFamilies) { +TEST_P(ColumnFamilyTestWithDynamic, WriteStallTwoColumnFamilies) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; + db_options_.use_dynamic_delay = std::get<1>(GetParam()); Open(); CreateColumnFamilies({"one"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); - VersionStorageInfo* vstorage = cfd->current()->storage_info(); ColumnFamilyData* cfd1 = static_cast(handles_[1])->cfd(); - VersionStorageInfo* vstorage1 = cfd1->current()->storage_info(); MutableCFOptions mutable_cf_options(column_family_options_); mutable_cf_options.level0_slowdown_writes_trigger = 20; mutable_cf_options.level0_stop_writes_trigger = 10000; - mutable_cf_options.soft_pending_compaction_bytes_limit = 200; - mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200 Gb; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000 Gb; MutableCFOptions mutable_cf_options1 = mutable_cf_options; - mutable_cf_options1.soft_pending_compaction_bytes_limit = 500; - - vstorage->TEST_set_estimated_compaction_needed_bytes(50); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(201); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(600); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(70); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(800); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(300); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(700); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(500); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(600); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); + mutable_cf_options1.soft_pending_compaction_bytes_limit = 500 Gb; + bool NotStopped = false; + bool Delayed = true; + bool NotDelayed = false; + double rate_divider; + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 50 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 201 Gb, 0 /* times_delayed*/, + mutable_cf_options1, NotStopped, NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 600 Gb, 0 /* times_delayed*/, + mutable_cf_options1, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 70 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 800 Gb, 1 /* times_delayed*/, + mutable_cf_options1, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 300 Gb, 2 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 700 Gb, 1 /* times_delayed*/, + mutable_cf_options1, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 500 Gb, 2 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 600 Gb, 1 /* times_delayed*/, + mutable_cf_options1, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); } -TEST_P(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { +TEST_P(ColumnFamilyTestWithDynamic, CompactionSpeedupTwoColumnFamilies) { db_options_.max_background_compactions = 6; column_family_options_.soft_pending_compaction_bytes_limit = 200; column_family_options_.hard_pending_compaction_bytes_limit = 2000; + db_options_.use_dynamic_delay = std::get<1>(GetParam()); Open(); CreateColumnFamilies({"one"}); ColumnFamilyData* cfd = @@ -3077,36 +3154,36 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { mutable_cf_options.level0_slowdown_writes_trigger = 36; mutable_cf_options.level0_stop_writes_trigger = 30; // Speedup threshold = 200 / 4 = 50 - mutable_cf_options.soft_pending_compaction_bytes_limit = 200; - mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200 Gb; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000 Gb; MutableCFOptions mutable_cf_options1 = mutable_cf_options; mutable_cf_options1.level0_slowdown_writes_trigger = 16; - vstorage->TEST_set_estimated_compaction_needed_bytes(40); + vstorage->TEST_set_estimated_compaction_needed_bytes(40 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(60); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + vstorage->TEST_set_estimated_compaction_needed_bytes(60 Gb); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage1->TEST_set_estimated_compaction_needed_bytes(30); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + vstorage1->TEST_set_estimated_compaction_needed_bytes(30 Gb); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage1->TEST_set_estimated_compaction_needed_bytes(70); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + vstorage1->TEST_set_estimated_compaction_needed_bytes(70 Gb); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(20); + vstorage->TEST_set_estimated_compaction_needed_bytes(20 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage1->TEST_set_estimated_compaction_needed_bytes(3); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + vstorage1->TEST_set_estimated_compaction_needed_bytes(3 Gb); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(9); @@ -3114,7 +3191,7 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage1->set_l0_delay_trigger_count(2); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(0); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 772c52d110..af8560915c 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -211,6 +211,7 @@ class CompactionJobTestBase : public testing::Test { mutable_cf_options_(cf_options_), mutable_db_options_(), table_cache_(NewLRUCache(50000, 16)), + write_controller_(db_options_.use_dynamic_delay), write_buffer_manager_(db_options_.db_write_buffer_size), versions_(new VersionSet( dbname_, &db_options_, env_options_, table_cache_.get(), diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 9fcf3c9a5f..4474bb4fed 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -197,7 +197,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), write_thread_(immutable_db_options_), nonmem_write_thread_(immutable_db_options_), - write_controller_(mutable_db_options_.delayed_write_rate), + write_controller_(immutable_db_options_.use_dynamic_delay, + mutable_db_options_.delayed_write_rate), last_batch_group_size_(0), unscheduled_flushes_(0), unscheduled_compactions_(0), diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 212b8e8124..f1e9863e0f 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -826,6 +826,7 @@ TEST_F(DBOptionsTest, SanitizeDelayedWriteRate) { Options options; options.env = CurrentOptions().env; options.delayed_write_rate = 0; + options.use_dynamic_delay = false; Reopen(options); ASSERT_EQ(16 * 1024 * 1024, dbfull()->GetDBOptions().delayed_write_rate); diff --git a/db/db_test.cc b/db/db_test.cc index 52f34f0fa9..6ed35ea4b9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -6471,6 +6471,7 @@ TEST_F(DBTest, DelayedWriteRate) { options.delayed_write_rate = 20000000; // Start with 200MB/s options.memtable_factory.reset( test::NewSpecialSkipListFactory(kEntriesPerMemTable)); + options.use_dynamic_delay = false; SetTimeElapseOnlySleepOnReopen(&options); CreateAndReopenWithCF({"pikachu"}, options); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f55726dad1..8c3ea9aad6 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1259,7 +1259,7 @@ class RecoveryTestHelper { std::unique_ptr versions; std::unique_ptr wal_manager; - WriteController write_controller; + WriteController write_controller(db_options.use_dynamic_delay); versions.reset(new VersionSet( test->dbname_, &db_options, file_options, table_cache.get(), diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 4b35bc9b40..174ffc8d4e 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -40,6 +40,7 @@ class FlushJobTestBase : public testing::Test { db_options_(options_), column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), table_cache_(NewLRUCache(50000, 16)), + write_controller_(db_options_.use_dynamic_delay), write_buffer_manager_(db_options_.db_write_buffer_size), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()) {} diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index b72113f7d0..931aa88a9c 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -98,7 +98,8 @@ class MemTableListTest : public testing::Test { EnvOptions env_options; std::shared_ptr table_cache(NewLRUCache(50000, 16)); WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); - WriteController write_controller(10000000u); + WriteController write_controller(immutable_db_options.use_dynamic_delay, + 10000000u); VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, @@ -149,7 +150,8 @@ class MemTableListTest : public testing::Test { EnvOptions env_options; std::shared_ptr table_cache(NewLRUCache(50000, 16)); WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); - WriteController write_controller(10000000u); + WriteController write_controller(immutable_db_options.use_dynamic_delay, + 10000000u); VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, diff --git a/db/repair.cc b/db/repair.cc index 34da5ba05f..a34d3e7fb6 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -118,7 +118,7 @@ class Repairer { /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, db_session_id_)), wb_(db_options_.db_write_buffer_size), - wc_(db_options_.delayed_write_rate), + wc_(db_options_.use_dynamic_delay, db_options_.delayed_write_rate), vset_(dbname_, &immutable_db_options_, file_options_, raw_table_cache_.get(), &wb_, &wc_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, diff --git a/db/version_set.cc b/db/version_set.cc index d39b5d31e3..dc4a0d53d6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5798,7 +5798,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, ColumnFamilyOptions cf_options(*options); std::shared_ptr tc(NewLRUCache(options->max_open_files - 10, options->table_cache_numshardbits)); - WriteController wc(options->delayed_write_rate); + WriteController wc(db_options.use_dynamic_delay, options->delayed_write_rate); WriteBufferManager wb(options->db_write_buffer_size); VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc, nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 906938b3b5..1ec4063c58 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1127,6 +1127,7 @@ class VersionSetTestBase { immutable_options_(db_options_, cf_options_), mutable_cf_options_(cf_options_), table_cache_(NewLRUCache(50000, 16)), + write_controller_(db_options_.use_dynamic_delay), write_buffer_manager_(db_options_.db_write_buffer_size), shutting_down_(false), mock_table_factory_(std::make_shared()) { diff --git a/db/version_util.h b/db/version_util.h index 6a809834dc..88dd709403 100644 --- a/db/version_util.h +++ b/db/version_util.h @@ -18,7 +18,7 @@ namespace ROCKSDB_NAMESPACE { class OfflineManifestWriter { public: OfflineManifestWriter(const DBOptions& options, const std::string& db_path) - : wc_(options.delayed_write_rate), + : wc_(options.use_dynamic_delay, options.delayed_write_rate), wb_(options.db_write_buffer_size), immutable_db_options_(WithDbPath(options, db_path)), tc_(NewLRUCache(1 << 20 /* capacity */, diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index cbf866ce36..665ffa413a 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -34,6 +34,7 @@ class WalManagerTest : public testing::Test { WalManagerTest() : dbname_(test::PerThreadDBPath("wal_manager_test")), db_options_(), + write_controller_(db_options_.use_dynamic_delay), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), current_log_number_(0) { diff --git a/db/write_controller.h b/db/write_controller.h index c32b70b941..8fd8a36ce2 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -22,13 +22,15 @@ class WriteControllerToken; // to be called while holding DB mutex class WriteController { public: - explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u, + explicit WriteController(bool dynamic_delay, + uint64_t _delayed_write_rate = 1024u * 1024u * 32u, int64_t low_pri_rate_bytes_per_sec = 1024 * 1024) : total_stopped_(0), total_delayed_(0), total_compaction_pressure_(0), credit_in_bytes_(0), next_refill_time_(0), + dynamic_delay_(dynamic_delay), low_pri_rate_limiter_( NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) { set_max_delayed_write_rate(_delayed_write_rate); @@ -84,6 +86,8 @@ class WriteController { RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); } + bool is_dynamic_delay() const { return dynamic_delay_; } + private: uint64_t NowMicrosMonotonic(SystemClock* clock); @@ -104,6 +108,8 @@ class WriteController { uint64_t max_delayed_write_rate_; // Current write rate (bytes / second) uint64_t delayed_write_rate_; + // Whether Speedb's dynamic delay is used + bool dynamic_delay_; std::unique_ptr low_pri_rate_limiter_; }; diff --git a/db/write_controller_test.cc b/db/write_controller_test.cc index 1f7cf999aa..8ae3611935 100644 --- a/db/write_controller_test.cc +++ b/db/write_controller_test.cc @@ -21,7 +21,8 @@ class TimeSetClock : public SystemClockWrapper { uint64_t NowNanos() override { return now_micros_ * std::milli::den; } }; } // namespace -class WriteControllerTest : public testing::Test { +// The param is whether dynamic_delay is used or not +class WriteControllerTest : public testing::TestWithParam { public: WriteControllerTest() { clock_ = std::make_shared(); } std::shared_ptr clock_; @@ -33,8 +34,8 @@ class WriteControllerTest : public testing::Test { #define MBPS MILLION #define SECS MILLION // in microseconds -TEST_F(WriteControllerTest, BasicAPI) { - WriteController controller(40 MBPS); // also set max delayed rate +TEST_P(WriteControllerTest, BasicAPI) { + WriteController controller(GetParam(), 40 MBPS); // also set max delayed rate EXPECT_EQ(controller.delayed_write_rate(), 40 MBPS); EXPECT_FALSE(controller.IsStopped()); EXPECT_FALSE(controller.NeedsDelay()); @@ -106,8 +107,8 @@ TEST_F(WriteControllerTest, BasicAPI) { EXPECT_FALSE(controller.NeedsDelay()); } -TEST_F(WriteControllerTest, StartFilled) { - WriteController controller(10 MBPS); +TEST_P(WriteControllerTest, StartFilled) { + WriteController controller(GetParam(), 10 MBPS); // Attempt to write two things that combined would be allowed within // a single refill interval @@ -132,8 +133,8 @@ TEST_F(WriteControllerTest, StartFilled) { EXPECT_LT(1.0 * delay, 1.001 SECS); } -TEST_F(WriteControllerTest, DebtAccumulation) { - WriteController controller(10 MBPS); +TEST_P(WriteControllerTest, DebtAccumulation) { + WriteController controller(GetParam(), 10 MBPS); std::array, 10> tokens; @@ -192,8 +193,8 @@ TEST_F(WriteControllerTest, DebtAccumulation) { } // This may or may not be a "good" feature, but it's an old feature -TEST_F(WriteControllerTest, CreditAccumulation) { - WriteController controller(10 MBPS); +TEST_P(WriteControllerTest, CreditAccumulation) { + WriteController controller(GetParam(), 10 MBPS); std::array, 10> tokens; @@ -238,6 +239,7 @@ TEST_F(WriteControllerTest, CreditAccumulation) { tokens[0] = controller.GetDelayToken(1 MBPS); ASSERT_EQ(10 SECS, controller.GetDelay(clock_.get(), 10 MB)); } +INSTANTIATE_TEST_CASE_P(DynamicWC, WriteControllerTest, testing::Bool()); } // namespace ROCKSDB_NAMESPACE diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 2ef3adc52c..c1a60686d8 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -252,6 +252,7 @@ DECLARE_int32(get_property_one_in); DECLARE_string(file_checksum_impl); DECLARE_int32(data_block_index_type); DECLARE_double(data_block_hash_table_util_ratio); +DECLARE_bool(use_dynamic_delay); #ifndef ROCKSDB_LITE // Options for StackableDB-based BlobDB diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 9a4aad2ad2..469478cd20 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -1063,4 +1063,7 @@ DEFINE_uint64(stats_dump_period_sec, ROCKSDB_NAMESPACE::Options().stats_dump_period_sec, "Gap between printing stats to log in seconds"); +DEFINE_bool(use_dynamic_delay, ROCKSDB_NAMESPACE::Options().use_dynamic_delay, + "Use dynamic delay"); + #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3153658945..96b15a1c9c 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -3126,6 +3126,7 @@ void InitializeOptionsFromFlags( FLAGS_verify_sst_unique_id_in_manifest; options.memtable_protection_bytes_per_key = FLAGS_memtable_protection_bytes_per_key; + options.use_dynamic_delay = FLAGS_use_dynamic_delay; // Integrated BlobDB options.enable_blob_files = FLAGS_enable_blob_files; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index acbc991677..9aa83cd12e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1052,6 +1052,19 @@ struct DBOptions { // Dynamically changeable through SetDBOptions() API. uint64_t delayed_write_rate = 0; + // Use Speedb's dynamic delay - + // https://github.com/speedb-io/speedb/issues/276. Setting this to true, + // enables a different kind of calculation (instead of SetupDelay) for the + // delayed_write_rate whenever a call to RecalculateWriteStallConditions is + // made. the calculation itself is explained in the ticket and in the code of + // CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause but in general its + // a linear decline of write speed with regards to by how much the system + // CURRENTLY exceeds the slowdown (soft_pending_compaction_bytes_limit and + // level0_slowdown_writes_trigger). + // + // Default: true + bool use_dynamic_delay = true; + // By default, a single write thread queue is maintained. The thread gets // to the head of the queue becomes write batch group leader and responsible // for writing to WAL and memtable for the batch group. diff --git a/options/db_options.cc b/options/db_options.cc index 840ade7ea7..b328f5761f 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -558,6 +558,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, enforce_single_del_contracts), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"use_dynamic_delay", + {offsetof(struct ImmutableDBOptions, use_dynamic_delay), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, }; const std::string OptionsHelper::kDBOptionsName = "DBOptions"; @@ -758,6 +762,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) checksum_handoff_file_types(options.checksum_handoff_file_types), lowest_used_cache_tier(options.lowest_used_cache_tier), compaction_service(options.compaction_service), + use_dynamic_delay(options.use_dynamic_delay), enforce_single_del_contracts(options.enforce_single_del_contracts) { fs = env->GetFileSystem(); clock = env->GetSystemClock().get(); @@ -844,6 +849,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { is_fd_close_on_exec); ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", advise_random_on_open); + ROCKS_LOG_HEADER(log, " Options.use_dynamic_delay: %d", + use_dynamic_delay); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index 8946f60ff4..837127ce60 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -105,6 +105,7 @@ struct ImmutableDBOptions { Statistics* stats; Logger* logger; std::shared_ptr compaction_service; + bool use_dynamic_delay; bool enforce_single_del_contracts; bool IsWalDirSameAsDBPath() const; diff --git a/options/options_helper.cc b/options/options_helper.cc index efb1d382e8..f3f6d27cb8 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -135,6 +135,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.listeners = immutable_db_options.listeners; options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; options.delayed_write_rate = mutable_db_options.delayed_write_rate; + options.use_dynamic_delay = immutable_db_options.use_dynamic_delay; options.enable_pipelined_write = immutable_db_options.enable_pipelined_write; options.unordered_write = immutable_db_options.unordered_write; options.allow_concurrent_memtable_write = diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 08e86e7fda..f6502f12cb 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -360,7 +360,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "db_host_id=hostname;" "lowest_used_cache_tier=kNonVolatileBlockTier;" "allow_data_in_errors=false;" - "enforce_single_del_contracts=false;", + "enforce_single_del_contracts=false;" + "use_dynamic_delay=true", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c1a813420a..1b2475a1e5 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1522,6 +1522,9 @@ DEFINE_uint64(delayed_write_rate, 8388608u, "Limited bytes allowed to DB when soft_rate_limit or " "level0_slowdown_writes_trigger triggers"); +DEFINE_bool(use_dynamic_delay, ROCKSDB_NAMESPACE::Options().use_dynamic_delay, + "use dynamic delay"); + DEFINE_bool(enable_pipelined_write, true, "Allow WAL and memtable writes to be pipelined"); @@ -4818,6 +4821,7 @@ class Benchmark { options.inplace_update_num_locks = FLAGS_inplace_update_num_locks; options.enable_write_thread_adaptive_yield = FLAGS_enable_write_thread_adaptive_yield; + options.use_dynamic_delay = FLAGS_use_dynamic_delay; options.enable_pipelined_write = FLAGS_enable_pipelined_write; options.unordered_write = FLAGS_unordered_write; options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 113f30f4c1..370878a94b 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -207,6 +207,7 @@ "customopspercent": 0, "filter_uri": lambda: random.choice(["speedb.PairedBloomFilter", ""]), "memtablerep": lambda: random.choice(["skip_list", "speedb.HashSpdRepFactory"]), + "use_dynamic_delay": lambda: random.choice([0, 1, 1, 1]), } _TEST_DIR_ENV_VAR = 'TEST_TMPDIR' diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 4adba6e9b1..ac7cf14201 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1322,7 +1322,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, // SanitizeOptions(), we need to initialize it manually. options.db_paths.emplace_back("dummy", 0); options.num_levels = 64; - WriteController wc(options.delayed_write_rate); + WriteController wc(options.use_dynamic_delay, options.delayed_write_rate); WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, @@ -1464,7 +1464,7 @@ Status GetLiveFilesChecksumInfoFromVersionSet(Options options, // SanitizeOptions(), we need to initialize it manually. options.db_paths.emplace_back(db_path, 0); options.num_levels = 64; - WriteController wc(options.delayed_write_rate); + WriteController wc(options.use_dynamic_delay, options.delayed_write_rate); WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, @@ -2272,7 +2272,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, std::shared_ptr tc( NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits)); const InternalKeyComparator cmp(opt.comparator); - WriteController wc(opt.delayed_write_rate); + WriteController wc(opt.use_dynamic_delay, opt.delayed_write_rate); WriteBufferManager wb(opt.db_write_buffer_size); VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index cf133b7e47..e7ab162210 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -204,7 +204,7 @@ class FileChecksumTestHelper { options_.table_cache_numshardbits)); options_.db_paths.emplace_back(dbname_, 0); options_.num_levels = 64; - WriteController wc(options_.delayed_write_rate); + WriteController wc(options_.use_dynamic_delay, options_.delayed_write_rate); WriteBufferManager wb(options_.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options_); VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb,