Skip to content

Commit

Permalink
live config changes: add RefreshOptions method (#294)
Browse files Browse the repository at this point in the history
Added a method and options to support reading configuration changes to options periodically while a process is running. Every "refresh_options_sec", the periodic task scheduler runs a task to check if the "refresh_options_file" exists. If it does, the file is loaded as an options file and the mutable options from the file are updated in the running process (via SetDBOptions and SetOptions). Once updated, the refresh file is deleted. If an error occurs, it will be written to the log file.

Note that the file is read as an Options file and must be a valid Options file. This means that the file must have a DBOptions section and at least one ColumnFamilyOptions section. Only options that are mutable can be in this file -- immutable options will cause an error and abort the processing.

Additionally, note that TableFactory options must be specified as part of the ColumnFamilyOptions. For example, to change the block size, the "table_factory.block_size" option should be set in the appropriate ColumnFamilyOptions. Options set in a TableFactory section will be ignored.

Currently tested manually via db_bench. Unit tests will be forthcoming.
  • Loading branch information
mrambacher authored and udi-speedb committed Nov 13, 2023
1 parent 72c16a9 commit 8554a92
Show file tree
Hide file tree
Showing 15 changed files with 469 additions and 1 deletion.
107 changes: 107 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
periodic_task_functions_.emplace(
PeriodicTaskType::kRecordSeqnoTime,
[this]() { this->RecordSeqnoToTimeMapping(); });
periodic_task_functions_.emplace(PeriodicTaskType::kRefreshOptions,
[this]() { this->RefreshOptions(); });

versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
table_cache_.get(), write_buffer_manager_,
Expand Down Expand Up @@ -833,6 +835,15 @@ Status DBImpl::StartPeriodicTaskScheduler() {
return s;
}
}
if (mutable_db_options_.refresh_options_sec > 0) {
Status s = periodic_task_scheduler_.Register(
PeriodicTaskType::kRefreshOptions,
periodic_task_functions_.at(PeriodicTaskType::kRefreshOptions),
mutable_db_options_.refresh_options_sec);
if (!s.ok()) {
return s;
}
}

Status s = periodic_task_scheduler_.Register(
PeriodicTaskType::kFlushInfoLog,
Expand Down Expand Up @@ -1144,6 +1155,82 @@ void DBImpl::FlushInfoLog() {
LogFlush(immutable_db_options_.info_log);
}

#ifndef ROCKSDB_LITE
// Periodically checks to see if the new options should be loaded into the
// process. log.
void DBImpl::RefreshOptions() {
if (shutdown_initiated_) {
return;
}
std::string new_options_file = mutable_db_options_.refresh_options_file;
if (new_options_file.empty()) {
new_options_file = "Options.new";
}
if (new_options_file[0] != kFilePathSeparator) {
new_options_file = NormalizePath(immutable_db_options_.db_paths[0].path +
kFilePathSeparator + new_options_file);
}
TEST_SYNC_POINT("DBImpl::RefreshOptions::Start");
Status s = fs_->FileExists(new_options_file, IOOptions(), nullptr);
TEST_SYNC_POINT_CALLBACK("DBImpl::RefreshOptions::FileExists", &s);
if (!s.ok()) {
return;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Refreshing Options from file: %s\n",
new_options_file.c_str());

ConfigOptions cfg_opts;
cfg_opts.ignore_unknown_options = true;
cfg_opts.mutable_options_only = true;
RocksDBOptionsParser op;
s = op.Parse(cfg_opts, new_options_file, fs_.get());
TEST_SYNC_POINT_CALLBACK("DBImpl::RefreshOptions::Parse", &s);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to parse Options file (%s): %s\n",
new_options_file.c_str(), s.ToString().c_str());
} else if (!op.db_opt_map()->empty()) {
s = SetDBOptions(*(op.db_opt_map()));
TEST_SYNC_POINT_CALLBACK("DBImpl::RefreshOptions::SetDBOptions", &s);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to refresh DBOptions, Aborting: %s\n",
s.ToString().c_str());
}
}
if (s.ok()) {
int idx = 0;
for (const auto& cf_opt_map : *(op.cf_opt_maps())) {
if (!cf_opt_map.empty()) {
const auto& cf_name = (*op.cf_names())[idx];
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(cf_name);
if (cfd == nullptr) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"RefreshOptions failed locating CF: %s\n",
cf_name.c_str());
} else if (!cfd->IsDropped()) {
s = SetCFOptionsImpl(cfd, cf_opt_map);
TEST_SYNC_POINT_CALLBACK("DBImpl::RefreshOptions::SetCFOptions", &s);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to refresh CFOptions for CF %s: %s\n",
cf_name.c_str(), s.ToString().c_str());
}
}
}
idx++;
}
}
s = fs_->DeleteFile(new_options_file, IOOptions(), nullptr);
TEST_SYNC_POINT_CALLBACK("DBImpl::RefreshOptions::DeleteFile", &s);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"RefreshOptions Complete, deleting options file %s: %s\n",
new_options_file.c_str(), s.ToString().c_str());
TEST_SYNC_POINT("DBImpl::RefreshOptions::Complete");
}
#endif // ROCKSDB_LITE

Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
int max_entries_to_print,
std::string* out_str) {
Expand Down Expand Up @@ -1192,7 +1279,14 @@ Status DBImpl::SetOptions(
cfd->GetName().c_str());
return Status::InvalidArgument("empty input");
}
return SetCFOptionsImpl(cfd, options_map);
#endif // ROCKSDB_LITE
}

#ifndef ROCKSDB_LITE
Status DBImpl::SetCFOptionsImpl(
ColumnFamilyData* cfd,
const std::unordered_map<std::string, std::string>& options_map) {
MutableCFOptions new_options;
Status s;
Status persist_options_status;
Expand Down Expand Up @@ -1242,6 +1336,7 @@ Status DBImpl::SetOptions(
LogFlush(immutable_db_options_.info_log);
return s;
}
#endif // ROCKSDB_LITE

Status DBImpl::SetDBOptions(
const std::unordered_map<std::string, std::string>& options_map) {
Expand Down Expand Up @@ -1345,6 +1440,18 @@ Status DBImpl::SetDBOptions(
new_options.stats_persist_period_sec);
}
}
if (s.ok()) {
if (new_options.refresh_options_sec == 0) {
s = periodic_task_scheduler_.Unregister(
PeriodicTaskType::kRefreshOptions);
} else {
s = periodic_task_scheduler_.Register(
PeriodicTaskType::kRefreshOptions,
periodic_task_functions_.at(PeriodicTaskType::kRefreshOptions),
new_options.refresh_options_sec);
}
}

mutex_.Lock();
if (!s.ok()) {
return s;
Expand Down
11 changes: 10 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,11 @@ class DBImpl : public DB {
// record current sequence number to time mapping
void RecordSeqnoToTimeMapping();

#ifndef ROCKSDB_LITE
// Checks if the options should be updated
void RefreshOptions();
#endif // ROCKSDB_LITE

// Interface to block and signal the DB in case of stalling writes by
// WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface.
// When DB needs to be blocked or signalled by WriteBufferManager,
Expand Down Expand Up @@ -1821,7 +1826,11 @@ class DBImpl : public DB {
ColumnFamilyHandle** handle);

Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);

#ifndef ROCKSDB_LITE
Status SetCFOptionsImpl(
ColumnFamilyData* cfd,
const std::unordered_map<std::string, std::string>& options_map);
#endif // ROCKSDB_LITE
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
// Delete obsolete files and log status and information of file deletion
Expand Down
Loading

0 comments on commit 8554a92

Please sign in to comment.