diff --git a/db/column_family.cc b/db/column_family.cc index 2bc4d397eb..49586cded9 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -881,6 +881,39 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, } } // namespace +namespace { +const uint64_t Gb = 1ull << 30; +const uint64_t Mb = 1ull << 20; +const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s. +const int kMemtablePenalty = 10; +} // namespace + +double ColumnFamilyData::TEST_CalculateWriteDelayDivider( + uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause) { + return CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( + compaction_needed_bytes, mutable_cf_options, write_stall_cause); +} + +std::unique_ptr ColumnFamilyData::DynamicSetupDelay( + WriteController* write_controller, uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause) { + uint64_t max_write_rate = write_controller->max_delayed_write_rate(); + + const double rate_divider = + CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( + compaction_needed_bytes, mutable_cf_options, write_stall_cause); + assert(rate_divider >= 1); + auto write_rate = static_cast(max_write_rate / rate_divider); + if (write_rate < kMinWriteRate) { + write_rate = kMinWriteRate; + } + + return write_controller->GetDelayToken(write_rate); +} + std::pair ColumnFamilyData::GetWriteStallConditionAndCause( int num_unflushed_memtables, int num_l0_files, @@ -919,6 +952,86 @@ ColumnFamilyData::GetWriteStallConditionAndCause( return {WriteStallCondition::kNormal, WriteStallCause::kNone}; } +// Delay divider is by how much we divide the users delayed_write_rate. +// E.g. divider 10 will result in 10 Mb/s from users 100 Mb/s. +// The rate is reduced linearly according to the range from slowdown to stop. +double +ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( + uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + ColumnFamilyData::WriteStallCause& write_stall_cause) { + assert(current_ != nullptr); + + const auto* vstorage = current_->storage_info(); + + // Memtables + // this can only be entered when we're at the last memtable and theres more + // than 3. delay by 10X when writing to the last memtable. + double memtable_divider = 1; + auto num_unflushed_memtables = imm()->NumNotFlushed(); + if (mutable_cf_options.max_write_buffer_number > 3 && + num_unflushed_memtables >= + mutable_cf_options.max_write_buffer_number - 1 && + num_unflushed_memtables - 1 >= + ioptions_.min_write_buffer_number_to_merge) { + memtable_divider = kMemtablePenalty; + } + + // Pending Compaction Bytes + double pending_divider = 1; + auto soft_limit = mutable_cf_options.soft_pending_compaction_bytes_limit; + if (soft_limit > 0 && compaction_needed_bytes >= soft_limit) { + auto hard_limit = mutable_cf_options.hard_pending_compaction_bytes_limit; + // soft_limit != hard_limit here. we're in a kDelayed state here and not + // stop. + assert(hard_limit > soft_limit); + uint64_t soft_hard_range = hard_limit - soft_limit; + // change rate every 1G change or 100Mb if soft_hard_range is too small. + auto step_size = soft_hard_range > Gb ? Gb : 100 * Mb; + uint64_t num_steps = soft_hard_range / step_size; + auto extra_bytes = compaction_needed_bytes - soft_limit; + auto step_num = static_cast(extra_bytes / step_size); + assert(step_num < num_steps); + if (num_steps > 0) { + pending_divider = 1 / (1 - (static_cast(step_num) / num_steps)); + } + } + + double biggest_divider = 1; + if (memtable_divider > pending_divider) { + biggest_divider = memtable_divider; + write_stall_cause = WriteStallCause::kMemtableLimit; + } else if (pending_divider > 1) { + biggest_divider = pending_divider; + write_stall_cause = WriteStallCause::kPendingCompactionBytes; + } + + // dont delay based on L0 when the user disables auto compactions + if (mutable_cf_options.disable_auto_compactions) { + return biggest_divider; + } + + // L0 files + double l0_divider = 1; + const auto extra_l0_ssts = vstorage->NumLevelFiles(0) - + mutable_cf_options.level0_slowdown_writes_trigger; + if (extra_l0_ssts > 0) { + const auto num_L0_steps = mutable_cf_options.level0_stop_writes_trigger - + mutable_cf_options.level0_slowdown_writes_trigger; + assert(num_L0_steps > 0); + // since extra_l0_ssts == num_L0_steps then we're in a stop condition. + assert(extra_l0_ssts < num_L0_steps); + l0_divider = 1 / (1 - (extra_l0_ssts / num_L0_steps)); + } + + if (l0_divider > biggest_divider) { + biggest_divider = l0_divider; + write_stall_cause = WriteStallCause::kL0FileCountLimit; + } + + return biggest_divider; +} + WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { auto write_stall_condition = WriteStallCondition::kNormal; @@ -937,6 +1050,18 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( bool was_stopped = write_controller->IsStopped(); bool needed_delay = write_controller->NeedsDelay(); + bool dynamic_delay = write_controller->is_dynamic_delay(); + + // GetWriteStallConditionAndCause returns the first condition met, so its + // possible that a later condition will require a harder rate limiting. + // calculate all conditions with DynamicSetupDelay and reavaluate the + // write_stall_cause. this is only relevant in the kDelayed case. + if (dynamic_delay && + write_stall_condition == WriteStallCondition::kDelayed) { + write_controller_token_ = + DynamicSetupDelay(write_controller, compaction_needed_bytes, + mutable_cf_options, write_stall_cause); + } if (write_stall_condition == WriteStallCondition::kStopped && write_stall_cause == WriteStallCause::kMemtableLimit) { @@ -971,10 +1096,12 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), compaction_needed_bytes); } else if (write_stall_condition == WriteStallCondition::kDelayed && write_stall_cause == WriteStallCause::kMemtableLimit) { - write_controller_token_ = - SetupDelay(write_controller, compaction_needed_bytes, - prev_compaction_needed_bytes_, was_stopped, - mutable_cf_options.disable_auto_compactions); + if (!dynamic_delay) { + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped, + mutable_cf_options.disable_auto_compactions); + } internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1); ROCKS_LOG_WARN( ioptions_.logger, @@ -986,13 +1113,15 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && write_stall_cause == WriteStallCause::kL0FileCountLimit) { - // L0 is the last two files from stopping. - bool near_stop = vstorage->l0_delay_trigger_count() >= - mutable_cf_options.level0_stop_writes_trigger - 2; - write_controller_token_ = - SetupDelay(write_controller, compaction_needed_bytes, - prev_compaction_needed_bytes_, was_stopped || near_stop, - mutable_cf_options.disable_auto_compactions); + if (!dynamic_delay) { + // L0 is the last two files from stopping. + bool near_stop = vstorage->l0_delay_trigger_count() >= + mutable_cf_options.level0_stop_writes_trigger - 2; + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped || near_stop, + mutable_cf_options.disable_auto_compactions); + } internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { @@ -1009,18 +1138,21 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( // If the distance to hard limit is less than 1/4 of the gap between soft // and // hard bytes limit, we think it is near stop and speed up the slowdown. - bool near_stop = - mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && - (compaction_needed_bytes - - mutable_cf_options.soft_pending_compaction_bytes_limit) > - 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit - - mutable_cf_options.soft_pending_compaction_bytes_limit) / - 4; + if (!dynamic_delay) { + bool near_stop = + mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && + (compaction_needed_bytes - + mutable_cf_options.soft_pending_compaction_bytes_limit) > + 3 * + (mutable_cf_options.hard_pending_compaction_bytes_limit - + mutable_cf_options.soft_pending_compaction_bytes_limit) / + 4; - write_controller_token_ = - SetupDelay(write_controller, compaction_needed_bytes, - prev_compaction_needed_bytes_, was_stopped || near_stop, - mutable_cf_options.disable_auto_compactions); + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped || near_stop, + mutable_cf_options.disable_auto_compactions); + } internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1); ROCKS_LOG_WARN( diff --git a/db/column_family.h b/db/column_family.h index 91a8253742..fc856610d1 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -485,6 +485,24 @@ class ColumnFamilyData { WriteStallCondition RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options); + // REQUIREMENT: db mutex must be held + double TEST_CalculateWriteDelayDivider( + uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause); + + private: + std::unique_ptr DynamicSetupDelay( + WriteController* write_controller, uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause); + + double CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( + uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + WriteStallCause& write_stall_cause); + + public: void set_initialized() { initialized_.store(true); } bool initialized() const { return initialized_.load(); } diff --git a/db/column_family_test.cc b/db/column_family_test.cc index b13038ab1a..6fa33c1607 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -531,6 +531,20 @@ class ColumnFamilyTestBase : public testing::Test { dbfull()-> TEST_UnlockMutex(); } + double CalculateWriteDelayDivider( + ColumnFamilyData* cfd, uint64_t compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options) { + // add lock to guard current_ (*Version) + ROCKSDB_NAMESPACE::ColumnFamilyData::WriteStallCause write_stall_cause = + ROCKSDB_NAMESPACE::ColumnFamilyData::WriteStallCause::kNone; + + dbfull()->TEST_LockMutex(); + double divider = cfd->TEST_CalculateWriteDelayDivider( + compaction_needed_bytes, mutable_cf_options, write_stall_cause); + dbfull()->TEST_UnlockMutex(); + return divider; + } + std::vector handles_; std::vector names_; std::vector> keys_; @@ -556,6 +570,67 @@ INSTANTIATE_TEST_CASE_P(FormatDef, ColumnFamilyTest, INSTANTIATE_TEST_CASE_P(FormatLatest, ColumnFamilyTest, testing::Values(kLatestFormatVersion)); +#define CALL_WRAPPER(func) \ + func; \ + ASSERT_FALSE(HasFailure()); + +// The params for this suite are the Format Version and whether +// use_dynamic_delay is used +class ColumnFamilyTestWithDynamic + : public ColumnFamilyTestBase, + virtual public ::testing::WithParamInterface> { + public: + ColumnFamilyTestWithDynamic() + : ColumnFamilyTestBase(std::get<0>(GetParam())) {} + + double SetDelayAndCalculateRate(ColumnFamilyData* cfd, + uint64_t pending_bytes_to_set, + int times_delayed, + const MutableCFOptions& mutable_cf_options, + bool expected_is_db_write_stopped, + bool expected_needs_delay, int l0_files = 0) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + vstorage->TEST_set_estimated_compaction_needed_bytes(pending_bytes_to_set); + if (l0_files > 0) { + vstorage->set_l0_delay_trigger_count(l0_files); + } + RecalculateWriteStallConditions(cfd, mutable_cf_options); + + CheckAssertions(expected_is_db_write_stopped, expected_needs_delay); + + double rate_divider = 0; + if (db_options_.use_dynamic_delay && expected_needs_delay) { + rate_divider = CalculateWriteDelayDivider( + cfd, vstorage->estimated_compaction_needed_bytes(), + mutable_cf_options); + } else { + rate_divider = 1; + for (int i = 0; i < times_delayed; i++) { + // each time SetupDelay is called the rate is divided by + // kIncSlowdownRatio (0.8) + rate_divider *= 1.25; + } + } + return rate_divider; + } + + void CheckAssertions(bool expected_is_db_write_stopped, + bool expected_needs_delay) { + ASSERT_TRUE(IsDbWriteStopped() == expected_is_db_write_stopped); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay() == + expected_needs_delay); + } +}; + +INSTANTIATE_TEST_CASE_P( + FormatDef, ColumnFamilyTestWithDynamic, + testing::Combine(testing::Values(test::kDefaultFormatVersion), + testing::Bool())); + +INSTANTIATE_TEST_CASE_P(FormatLatest, ColumnFamilyTestWithDynamic, + testing::Combine(testing::Values(kLatestFormatVersion), + testing::Bool())); + TEST_P(ColumnFamilyTest, DontReuseColumnFamilyID) { for (int iter = 0; iter < 3; ++iter) { Open(); @@ -2736,11 +2811,15 @@ TEST_P(ColumnFamilyTest, CreateAndDropRace) { } #endif // !ROCKSDB_LITE -TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) { +namespace { +#define Gb *1073741824ull +} // namespace + +TEST_P(ColumnFamilyTestWithDynamic, WriteStallSingleColumnFamily) { const uint64_t kBaseRate = 800000u; db_options_.delayed_write_rate = kBaseRate; db_options_.max_background_compactions = 6; - + db_options_.use_dynamic_delay = std::get<1>(GetParam()); Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); @@ -2751,135 +2830,133 @@ TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) { mutable_cf_options.level0_slowdown_writes_trigger = 20; mutable_cf_options.level0_stop_writes_trigger = 10000; - mutable_cf_options.soft_pending_compaction_bytes_limit = 200; - mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200 Gb; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000 Gb; mutable_cf_options.disable_auto_compactions = false; - - vstorage->TEST_set_estimated_compaction_needed_bytes(50); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(201); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); + bool Stopped = true; + bool NotStopped = false; + bool Delayed = true; + bool NotDelayed = false; + double rate_divider; + + CALL_WRAPPER(SetDelayAndCalculateRate(cfd, 50 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, + NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 201 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(400); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 400 Gb, 1 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(500); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(450); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(205); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(202); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(201); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(198); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(399); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(599); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(2001); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 500 Gb, 2 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 450 Gb, 1 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 205 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 202 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 201 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 198 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 399 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 599 Gb, 1 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 2001 Gb, 0 /* times_delayed*/, + mutable_cf_options, Stopped, NotDelayed)); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(3001); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(390); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(100); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage->set_l0_delay_trigger_count(100); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 3001 Gb, 0 /* times_delayed*/, + mutable_cf_options, Stopped, NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 390 Gb, 1 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 100 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, NotDelayed)); + + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 100 Gb, 0 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 100 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->set_l0_delay_trigger_count(101); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->set_l0_delay_trigger_count(0); - vstorage->TEST_set_estimated_compaction_needed_bytes(300); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage->set_l0_delay_trigger_count(101); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(200); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 100 Gb, 1 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 101 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 300 Gb, 2 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 0 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 300 Gb, 3 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 101 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 200 Gb, 2 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); vstorage->set_l0_delay_trigger_count(0); - vstorage->TEST_set_estimated_compaction_needed_bytes(0); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 0 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, NotDelayed)); mutable_cf_options.disable_auto_compactions = true; dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate); @@ -2887,39 +2964,37 @@ TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - vstorage->set_l0_delay_trigger_count(50); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 0 Gb, 0 /* times_delayed*/, mutable_cf_options, NotStopped, + NotDelayed, 50 /* l0_files*/)); ASSERT_EQ(0, GetDbDelayedWriteRate()); - ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + dbfull()->TEST_write_controler().delayed_write_rate()); - vstorage->set_l0_delay_trigger_count(60); - vstorage->TEST_set_estimated_compaction_needed_bytes(300); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 300 Gb, 0 /* times_delayed*/, mutable_cf_options, NotStopped, + NotDelayed, 60 /* l0_files*/)); ASSERT_EQ(0, GetDbDelayedWriteRate()); - ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + dbfull()->TEST_write_controler().delayed_write_rate()); mutable_cf_options.disable_auto_compactions = false; - vstorage->set_l0_delay_trigger_count(70); - vstorage->TEST_set_estimated_compaction_needed_bytes(500); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->set_l0_delay_trigger_count(71); - vstorage->TEST_set_estimated_compaction_needed_bytes(501); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 500 Gb, 0 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 70 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER(SetDelayAndCalculateRate( + cfd, 501 Gb, 1 /* times_delayed*/, mutable_cf_options, NotStopped, + Delayed, 71 /* l0_files*/)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); } -TEST_P(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { +TEST_P(ColumnFamilyTestWithDynamic, CompactionSpeedupSingleColumnFamily) { db_options_.max_background_compactions = 6; + db_options_.use_dynamic_delay = std::get<1>(GetParam()); Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); @@ -2933,22 +3008,22 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { mutable_cf_options.level0_slowdown_writes_trigger = 36; mutable_cf_options.level0_stop_writes_trigger = 50; // Speedup threshold = 200 / 4 = 50 - mutable_cf_options.soft_pending_compaction_bytes_limit = 200; - mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200 Gb; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000 Gb; - vstorage->TEST_set_estimated_compaction_needed_bytes(40); + vstorage->TEST_set_estimated_compaction_needed_bytes(40 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(50); + vstorage->TEST_set_estimated_compaction_needed_bytes(50 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(300); + vstorage->TEST_set_estimated_compaction_needed_bytes(300 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(45); + vstorage->TEST_set_estimated_compaction_needed_bytes(45 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); @@ -2982,85 +3057,87 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); } -TEST_P(ColumnFamilyTest, WriteStallTwoColumnFamilies) { +TEST_P(ColumnFamilyTestWithDynamic, WriteStallTwoColumnFamilies) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; + db_options_.use_dynamic_delay = std::get<1>(GetParam()); Open(); CreateColumnFamilies({"one"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); - VersionStorageInfo* vstorage = cfd->current()->storage_info(); ColumnFamilyData* cfd1 = static_cast(handles_[1])->cfd(); - VersionStorageInfo* vstorage1 = cfd1->current()->storage_info(); MutableCFOptions mutable_cf_options(column_family_options_); mutable_cf_options.level0_slowdown_writes_trigger = 20; mutable_cf_options.level0_stop_writes_trigger = 10000; - mutable_cf_options.soft_pending_compaction_bytes_limit = 200; - mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200 Gb; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000 Gb; MutableCFOptions mutable_cf_options1 = mutable_cf_options; - mutable_cf_options1.soft_pending_compaction_bytes_limit = 500; - - vstorage->TEST_set_estimated_compaction_needed_bytes(50); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(201); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(600); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(70); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(800); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(300); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(700); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); - - vstorage->TEST_set_estimated_compaction_needed_bytes(500); - RecalculateWriteStallConditions(cfd, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); - - vstorage1->TEST_set_estimated_compaction_needed_bytes(600); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); - ASSERT_TRUE(!IsDbWriteStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); - ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); + mutable_cf_options1.soft_pending_compaction_bytes_limit = 500 Gb; + bool NotStopped = false; + bool Delayed = true; + bool NotDelayed = false; + double rate_divider; + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 50 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 201 Gb, 0 /* times_delayed*/, + mutable_cf_options1, NotStopped, NotDelayed)); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 600 Gb, 0 /* times_delayed*/, + mutable_cf_options1, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 70 Gb, 0 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 800 Gb, 1 /* times_delayed*/, + mutable_cf_options1, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 300 Gb, 2 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 700 Gb, 1 /* times_delayed*/, + mutable_cf_options1, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd, 500 Gb, 2 /* times_delayed*/, + mutable_cf_options, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); + + rate_divider = CALL_WRAPPER( + SetDelayAndCalculateRate(cfd1, 600 Gb, 1 /* times_delayed*/, + mutable_cf_options1, NotStopped, Delayed)); + ASSERT_EQ(static_cast(kBaseRate / rate_divider), + GetDbDelayedWriteRate()); } -TEST_P(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { +TEST_P(ColumnFamilyTestWithDynamic, CompactionSpeedupTwoColumnFamilies) { db_options_.max_background_compactions = 6; column_family_options_.soft_pending_compaction_bytes_limit = 200; column_family_options_.hard_pending_compaction_bytes_limit = 2000; + db_options_.use_dynamic_delay = std::get<1>(GetParam()); Open(); CreateColumnFamilies({"one"}); ColumnFamilyData* cfd = @@ -3077,36 +3154,36 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { mutable_cf_options.level0_slowdown_writes_trigger = 36; mutable_cf_options.level0_stop_writes_trigger = 30; // Speedup threshold = 200 / 4 = 50 - mutable_cf_options.soft_pending_compaction_bytes_limit = 200; - mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200 Gb; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000 Gb; MutableCFOptions mutable_cf_options1 = mutable_cf_options; mutable_cf_options1.level0_slowdown_writes_trigger = 16; - vstorage->TEST_set_estimated_compaction_needed_bytes(40); + vstorage->TEST_set_estimated_compaction_needed_bytes(40 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(60); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + vstorage->TEST_set_estimated_compaction_needed_bytes(60 Gb); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage1->TEST_set_estimated_compaction_needed_bytes(30); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + vstorage1->TEST_set_estimated_compaction_needed_bytes(30 Gb); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage1->TEST_set_estimated_compaction_needed_bytes(70); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + vstorage1->TEST_set_estimated_compaction_needed_bytes(70 Gb); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage->TEST_set_estimated_compaction_needed_bytes(20); + vstorage->TEST_set_estimated_compaction_needed_bytes(20 Gb); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); - vstorage1->TEST_set_estimated_compaction_needed_bytes(3); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + vstorage1->TEST_set_estimated_compaction_needed_bytes(3 Gb); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(9); @@ -3114,7 +3191,7 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage1->set_l0_delay_trigger_count(2); - RecalculateWriteStallConditions(cfd1, mutable_cf_options); + RecalculateWriteStallConditions(cfd1, mutable_cf_options1); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(0); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 772c52d110..af8560915c 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -211,6 +211,7 @@ class CompactionJobTestBase : public testing::Test { mutable_cf_options_(cf_options_), mutable_db_options_(), table_cache_(NewLRUCache(50000, 16)), + write_controller_(db_options_.use_dynamic_delay), write_buffer_manager_(db_options_.db_write_buffer_size), versions_(new VersionSet( dbname_, &db_options_, env_options_, table_cache_.get(), diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 9fcf3c9a5f..4474bb4fed 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -197,7 +197,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), write_thread_(immutable_db_options_), nonmem_write_thread_(immutable_db_options_), - write_controller_(mutable_db_options_.delayed_write_rate), + write_controller_(immutable_db_options_.use_dynamic_delay, + mutable_db_options_.delayed_write_rate), last_batch_group_size_(0), unscheduled_flushes_(0), unscheduled_compactions_(0), diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 212b8e8124..f1e9863e0f 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -826,6 +826,7 @@ TEST_F(DBOptionsTest, SanitizeDelayedWriteRate) { Options options; options.env = CurrentOptions().env; options.delayed_write_rate = 0; + options.use_dynamic_delay = false; Reopen(options); ASSERT_EQ(16 * 1024 * 1024, dbfull()->GetDBOptions().delayed_write_rate); diff --git a/db/db_test.cc b/db/db_test.cc index 52f34f0fa9..6ed35ea4b9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -6471,6 +6471,7 @@ TEST_F(DBTest, DelayedWriteRate) { options.delayed_write_rate = 20000000; // Start with 200MB/s options.memtable_factory.reset( test::NewSpecialSkipListFactory(kEntriesPerMemTable)); + options.use_dynamic_delay = false; SetTimeElapseOnlySleepOnReopen(&options); CreateAndReopenWithCF({"pikachu"}, options); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f55726dad1..8c3ea9aad6 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1259,7 +1259,7 @@ class RecoveryTestHelper { std::unique_ptr versions; std::unique_ptr wal_manager; - WriteController write_controller; + WriteController write_controller(db_options.use_dynamic_delay); versions.reset(new VersionSet( test->dbname_, &db_options, file_options, table_cache.get(), diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 4b35bc9b40..174ffc8d4e 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -40,6 +40,7 @@ class FlushJobTestBase : public testing::Test { db_options_(options_), column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), table_cache_(NewLRUCache(50000, 16)), + write_controller_(db_options_.use_dynamic_delay), write_buffer_manager_(db_options_.db_write_buffer_size), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()) {} diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index b72113f7d0..931aa88a9c 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -98,7 +98,8 @@ class MemTableListTest : public testing::Test { EnvOptions env_options; std::shared_ptr table_cache(NewLRUCache(50000, 16)); WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); - WriteController write_controller(10000000u); + WriteController write_controller(immutable_db_options.use_dynamic_delay, + 10000000u); VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, @@ -149,7 +150,8 @@ class MemTableListTest : public testing::Test { EnvOptions env_options; std::shared_ptr table_cache(NewLRUCache(50000, 16)); WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); - WriteController write_controller(10000000u); + WriteController write_controller(immutable_db_options.use_dynamic_delay, + 10000000u); VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, diff --git a/db/repair.cc b/db/repair.cc index 34da5ba05f..a34d3e7fb6 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -118,7 +118,7 @@ class Repairer { /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, db_session_id_)), wb_(db_options_.db_write_buffer_size), - wc_(db_options_.delayed_write_rate), + wc_(db_options_.use_dynamic_delay, db_options_.delayed_write_rate), vset_(dbname_, &immutable_db_options_, file_options_, raw_table_cache_.get(), &wb_, &wc_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, diff --git a/db/version_set.cc b/db/version_set.cc index d39b5d31e3..dc4a0d53d6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5798,7 +5798,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, ColumnFamilyOptions cf_options(*options); std::shared_ptr tc(NewLRUCache(options->max_open_files - 10, options->table_cache_numshardbits)); - WriteController wc(options->delayed_write_rate); + WriteController wc(db_options.use_dynamic_delay, options->delayed_write_rate); WriteBufferManager wb(options->db_write_buffer_size); VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc, nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 906938b3b5..1ec4063c58 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1127,6 +1127,7 @@ class VersionSetTestBase { immutable_options_(db_options_, cf_options_), mutable_cf_options_(cf_options_), table_cache_(NewLRUCache(50000, 16)), + write_controller_(db_options_.use_dynamic_delay), write_buffer_manager_(db_options_.db_write_buffer_size), shutting_down_(false), mock_table_factory_(std::make_shared()) { diff --git a/db/version_util.h b/db/version_util.h index 6a809834dc..88dd709403 100644 --- a/db/version_util.h +++ b/db/version_util.h @@ -18,7 +18,7 @@ namespace ROCKSDB_NAMESPACE { class OfflineManifestWriter { public: OfflineManifestWriter(const DBOptions& options, const std::string& db_path) - : wc_(options.delayed_write_rate), + : wc_(options.use_dynamic_delay, options.delayed_write_rate), wb_(options.db_write_buffer_size), immutable_db_options_(WithDbPath(options, db_path)), tc_(NewLRUCache(1 << 20 /* capacity */, diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index cbf866ce36..665ffa413a 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -34,6 +34,7 @@ class WalManagerTest : public testing::Test { WalManagerTest() : dbname_(test::PerThreadDBPath("wal_manager_test")), db_options_(), + write_controller_(db_options_.use_dynamic_delay), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), current_log_number_(0) { diff --git a/db/write_controller.h b/db/write_controller.h index c32b70b941..8fd8a36ce2 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -22,13 +22,15 @@ class WriteControllerToken; // to be called while holding DB mutex class WriteController { public: - explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u, + explicit WriteController(bool dynamic_delay, + uint64_t _delayed_write_rate = 1024u * 1024u * 32u, int64_t low_pri_rate_bytes_per_sec = 1024 * 1024) : total_stopped_(0), total_delayed_(0), total_compaction_pressure_(0), credit_in_bytes_(0), next_refill_time_(0), + dynamic_delay_(dynamic_delay), low_pri_rate_limiter_( NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) { set_max_delayed_write_rate(_delayed_write_rate); @@ -84,6 +86,8 @@ class WriteController { RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); } + bool is_dynamic_delay() const { return dynamic_delay_; } + private: uint64_t NowMicrosMonotonic(SystemClock* clock); @@ -104,6 +108,8 @@ class WriteController { uint64_t max_delayed_write_rate_; // Current write rate (bytes / second) uint64_t delayed_write_rate_; + // Whether Speedb's dynamic delay is used + bool dynamic_delay_; std::unique_ptr low_pri_rate_limiter_; }; diff --git a/db/write_controller_test.cc b/db/write_controller_test.cc index 1f7cf999aa..8ae3611935 100644 --- a/db/write_controller_test.cc +++ b/db/write_controller_test.cc @@ -21,7 +21,8 @@ class TimeSetClock : public SystemClockWrapper { uint64_t NowNanos() override { return now_micros_ * std::milli::den; } }; } // namespace -class WriteControllerTest : public testing::Test { +// The param is whether dynamic_delay is used or not +class WriteControllerTest : public testing::TestWithParam { public: WriteControllerTest() { clock_ = std::make_shared(); } std::shared_ptr clock_; @@ -33,8 +34,8 @@ class WriteControllerTest : public testing::Test { #define MBPS MILLION #define SECS MILLION // in microseconds -TEST_F(WriteControllerTest, BasicAPI) { - WriteController controller(40 MBPS); // also set max delayed rate +TEST_P(WriteControllerTest, BasicAPI) { + WriteController controller(GetParam(), 40 MBPS); // also set max delayed rate EXPECT_EQ(controller.delayed_write_rate(), 40 MBPS); EXPECT_FALSE(controller.IsStopped()); EXPECT_FALSE(controller.NeedsDelay()); @@ -106,8 +107,8 @@ TEST_F(WriteControllerTest, BasicAPI) { EXPECT_FALSE(controller.NeedsDelay()); } -TEST_F(WriteControllerTest, StartFilled) { - WriteController controller(10 MBPS); +TEST_P(WriteControllerTest, StartFilled) { + WriteController controller(GetParam(), 10 MBPS); // Attempt to write two things that combined would be allowed within // a single refill interval @@ -132,8 +133,8 @@ TEST_F(WriteControllerTest, StartFilled) { EXPECT_LT(1.0 * delay, 1.001 SECS); } -TEST_F(WriteControllerTest, DebtAccumulation) { - WriteController controller(10 MBPS); +TEST_P(WriteControllerTest, DebtAccumulation) { + WriteController controller(GetParam(), 10 MBPS); std::array, 10> tokens; @@ -192,8 +193,8 @@ TEST_F(WriteControllerTest, DebtAccumulation) { } // This may or may not be a "good" feature, but it's an old feature -TEST_F(WriteControllerTest, CreditAccumulation) { - WriteController controller(10 MBPS); +TEST_P(WriteControllerTest, CreditAccumulation) { + WriteController controller(GetParam(), 10 MBPS); std::array, 10> tokens; @@ -238,6 +239,7 @@ TEST_F(WriteControllerTest, CreditAccumulation) { tokens[0] = controller.GetDelayToken(1 MBPS); ASSERT_EQ(10 SECS, controller.GetDelay(clock_.get(), 10 MB)); } +INSTANTIATE_TEST_CASE_P(DynamicWC, WriteControllerTest, testing::Bool()); } // namespace ROCKSDB_NAMESPACE diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 2ef3adc52c..c1a60686d8 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -252,6 +252,7 @@ DECLARE_int32(get_property_one_in); DECLARE_string(file_checksum_impl); DECLARE_int32(data_block_index_type); DECLARE_double(data_block_hash_table_util_ratio); +DECLARE_bool(use_dynamic_delay); #ifndef ROCKSDB_LITE // Options for StackableDB-based BlobDB diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 9a4aad2ad2..469478cd20 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -1063,4 +1063,7 @@ DEFINE_uint64(stats_dump_period_sec, ROCKSDB_NAMESPACE::Options().stats_dump_period_sec, "Gap between printing stats to log in seconds"); +DEFINE_bool(use_dynamic_delay, ROCKSDB_NAMESPACE::Options().use_dynamic_delay, + "Use dynamic delay"); + #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3153658945..96b15a1c9c 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -3126,6 +3126,7 @@ void InitializeOptionsFromFlags( FLAGS_verify_sst_unique_id_in_manifest; options.memtable_protection_bytes_per_key = FLAGS_memtable_protection_bytes_per_key; + options.use_dynamic_delay = FLAGS_use_dynamic_delay; // Integrated BlobDB options.enable_blob_files = FLAGS_enable_blob_files; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index acbc991677..9aa83cd12e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1052,6 +1052,19 @@ struct DBOptions { // Dynamically changeable through SetDBOptions() API. uint64_t delayed_write_rate = 0; + // Use Speedb's dynamic delay - + // https://github.com/speedb-io/speedb/issues/276. Setting this to true, + // enables a different kind of calculation (instead of SetupDelay) for the + // delayed_write_rate whenever a call to RecalculateWriteStallConditions is + // made. the calculation itself is explained in the ticket and in the code of + // CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause but in general its + // a linear decline of write speed with regards to by how much the system + // CURRENTLY exceeds the slowdown (soft_pending_compaction_bytes_limit and + // level0_slowdown_writes_trigger). + // + // Default: true + bool use_dynamic_delay = true; + // By default, a single write thread queue is maintained. The thread gets // to the head of the queue becomes write batch group leader and responsible // for writing to WAL and memtable for the batch group. diff --git a/options/db_options.cc b/options/db_options.cc index 840ade7ea7..b328f5761f 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -558,6 +558,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, enforce_single_del_contracts), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"use_dynamic_delay", + {offsetof(struct ImmutableDBOptions, use_dynamic_delay), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, }; const std::string OptionsHelper::kDBOptionsName = "DBOptions"; @@ -758,6 +762,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) checksum_handoff_file_types(options.checksum_handoff_file_types), lowest_used_cache_tier(options.lowest_used_cache_tier), compaction_service(options.compaction_service), + use_dynamic_delay(options.use_dynamic_delay), enforce_single_del_contracts(options.enforce_single_del_contracts) { fs = env->GetFileSystem(); clock = env->GetSystemClock().get(); @@ -844,6 +849,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { is_fd_close_on_exec); ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", advise_random_on_open); + ROCKS_LOG_HEADER(log, " Options.use_dynamic_delay: %d", + use_dynamic_delay); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index 8946f60ff4..837127ce60 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -105,6 +105,7 @@ struct ImmutableDBOptions { Statistics* stats; Logger* logger; std::shared_ptr compaction_service; + bool use_dynamic_delay; bool enforce_single_del_contracts; bool IsWalDirSameAsDBPath() const; diff --git a/options/options_helper.cc b/options/options_helper.cc index efb1d382e8..f3f6d27cb8 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -135,6 +135,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.listeners = immutable_db_options.listeners; options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; options.delayed_write_rate = mutable_db_options.delayed_write_rate; + options.use_dynamic_delay = immutable_db_options.use_dynamic_delay; options.enable_pipelined_write = immutable_db_options.enable_pipelined_write; options.unordered_write = immutable_db_options.unordered_write; options.allow_concurrent_memtable_write = diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 08e86e7fda..f6502f12cb 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -360,7 +360,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "db_host_id=hostname;" "lowest_used_cache_tier=kNonVolatileBlockTier;" "allow_data_in_errors=false;" - "enforce_single_del_contracts=false;", + "enforce_single_del_contracts=false;" + "use_dynamic_delay=true", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c1a813420a..1b2475a1e5 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1522,6 +1522,9 @@ DEFINE_uint64(delayed_write_rate, 8388608u, "Limited bytes allowed to DB when soft_rate_limit or " "level0_slowdown_writes_trigger triggers"); +DEFINE_bool(use_dynamic_delay, ROCKSDB_NAMESPACE::Options().use_dynamic_delay, + "use dynamic delay"); + DEFINE_bool(enable_pipelined_write, true, "Allow WAL and memtable writes to be pipelined"); @@ -4818,6 +4821,7 @@ class Benchmark { options.inplace_update_num_locks = FLAGS_inplace_update_num_locks; options.enable_write_thread_adaptive_yield = FLAGS_enable_write_thread_adaptive_yield; + options.use_dynamic_delay = FLAGS_use_dynamic_delay; options.enable_pipelined_write = FLAGS_enable_pipelined_write; options.unordered_write = FLAGS_unordered_write; options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 113f30f4c1..370878a94b 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -207,6 +207,7 @@ "customopspercent": 0, "filter_uri": lambda: random.choice(["speedb.PairedBloomFilter", ""]), "memtablerep": lambda: random.choice(["skip_list", "speedb.HashSpdRepFactory"]), + "use_dynamic_delay": lambda: random.choice([0, 1, 1, 1]), } _TEST_DIR_ENV_VAR = 'TEST_TMPDIR' diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 4adba6e9b1..ac7cf14201 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1322,7 +1322,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, // SanitizeOptions(), we need to initialize it manually. options.db_paths.emplace_back("dummy", 0); options.num_levels = 64; - WriteController wc(options.delayed_write_rate); + WriteController wc(options.use_dynamic_delay, options.delayed_write_rate); WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, @@ -1464,7 +1464,7 @@ Status GetLiveFilesChecksumInfoFromVersionSet(Options options, // SanitizeOptions(), we need to initialize it manually. options.db_paths.emplace_back(db_path, 0); options.num_levels = 64; - WriteController wc(options.delayed_write_rate); + WriteController wc(options.use_dynamic_delay, options.delayed_write_rate); WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, @@ -2272,7 +2272,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, std::shared_ptr tc( NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits)); const InternalKeyComparator cmp(opt.comparator); - WriteController wc(opt.delayed_write_rate); + WriteController wc(opt.use_dynamic_delay, opt.delayed_write_rate); WriteBufferManager wb(opt.db_write_buffer_size); VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index cf133b7e47..e7ab162210 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -204,7 +204,7 @@ class FileChecksumTestHelper { options_.table_cache_numshardbits)); options_.db_paths.emplace_back(dbname_, 0); options_.num_levels = 64; - WriteController wc(options_.delayed_write_rate); + WriteController wc(options_.use_dynamic_delay, options_.delayed_write_rate); WriteBufferManager wb(options_.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options_); VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb,