Skip to content

Commit

Permalink
Options for file temperature for more files (#12957)
Browse files Browse the repository at this point in the history
Summary:
We have a request to use the cold tier as primary source of truth for the DB, and to best support such use cases and to complement the existing options controlling SST file temperatures, we add two new DB options:
* `metadata_write_temperature` for DB "small" files that don't contain much user data
* `wal_write_temperature` for WALs.

Pull Request resolved: #12957

Test Plan: Unit test included, though it's hard to be sure we've covered all the places

Reviewed By: jowlyzhang

Differential Revision: D61664815

Pulled By: pdillinger

fbshipit-source-id: 8e19c9dd8fd2db059bb15f74938d6bc12002e82b
  • Loading branch information
pdillinger committed Aug 24, 2024
1 parent f172876 commit aaa570b
Show file tree
Hide file tree
Showing 19 changed files with 382 additions and 78 deletions.
6 changes: 4 additions & 2 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ class CompactionJobTestBase : public testing::Test {
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr, /*read_only=*/false));
compaction_job_stats_.Reset();
ASSERT_OK(SetIdentityFile(WriteOptions(), env_, dbname_));
ASSERT_OK(
SetIdentityFile(WriteOptions(), env_, dbname_, Temperature::kUnknown));

VersionEdit new_db;
new_db.SetLogNumber(0);
Expand All @@ -575,7 +576,8 @@ class CompactionJobTestBase : public testing::Test {
}
ASSERT_OK(s);
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(WriteOptions(), fs_.get(), dbname_, 1, nullptr);
s = SetCurrentFile(WriteOptions(), fs_.get(), dbname_, 1,
Temperature::kUnknown, nullptr);

ASSERT_OK(s);

Expand Down
4 changes: 3 additions & 1 deletion db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,9 @@ Status DBImpl::SetupDBId(const WriteOptions& write_options, bool read_only,
}
// Persist it to IDENTITY file if allowed
if (!read_only) {
s = SetIdentityFile(write_options, env_, dbname_, db_id_);
s = SetIdentityFile(write_options, env_, dbname_,
immutable_db_options_.metadata_write_temperature,
db_id_);
}
return s;
}
Expand Down
14 changes: 13 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) {
Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
VersionEdit new_db;
const WriteOptions write_options(Env::IOActivity::kDBOpen);
Status s = SetIdentityFile(write_options, env_, dbname_);
Status s = SetIdentityFile(write_options, env_, dbname_,
immutable_db_options_.metadata_write_temperature);
if (!s.ok()) {
return s;
}
Expand All @@ -319,6 +320,12 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
}
std::unique_ptr<FSWritableFile> file;
FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
// DB option takes precedence when not kUnknown
if (immutable_db_options_.metadata_write_temperature !=
Temperature::kUnknown) {
file_options.temperature =
immutable_db_options_.metadata_write_temperature;
}
s = NewWritableFile(fs_.get(), manifest, &file, file_options);
if (!s.ok()) {
return s;
Expand All @@ -344,6 +351,7 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
if (s.ok()) {
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(write_options, fs_.get(), dbname_, 1,
immutable_db_options_.metadata_write_temperature,
directories_.GetDbDir());
if (new_filenames) {
new_filenames->emplace_back(
Expand Down Expand Up @@ -1936,6 +1944,10 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
BuildDBOptions(immutable_db_options_, mutable_db_options_);
FileOptions opt_file_options =
fs_->OptimizeForLogWrite(file_options_, db_options);
// DB option takes precedence when not kUnknown
if (immutable_db_options_.wal_write_temperature != Temperature::kUnknown) {
opt_file_options.temperature = immutable_db_options_.wal_write_temperature;
}
std::string wal_dir = immutable_db_options_.GetWalDir();
std::string log_fname = LogFileName(wal_dir, log_file_num);

Expand Down
230 changes: 230 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <atomic>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>

#include "db/db_test_util.h"
Expand All @@ -26,6 +27,7 @@
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h"
#include "test_util/testutil.h"
#include "util/defer.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"

Expand Down Expand Up @@ -6544,6 +6546,234 @@ TEST_P(RenameCurrentTest, Compaction) {
ASSERT_EQ("d_value", Get("d"));
}

TEST_F(DBTest2, VariousFileTemperatures) {
constexpr size_t kNumberFileTypes = static_cast<size_t>(kBlobFile) + 1U;

struct MyTestFS : public FileTemperatureTestFS {
explicit MyTestFS(const std::shared_ptr<FileSystem>& fs)
: FileTemperatureTestFS(fs) {
Reset();
}

IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
IOStatus ios =
FileTemperatureTestFS::NewWritableFile(fname, opts, result, dbg);
if (ios.ok()) {
uint64_t number;
FileType type;
if (ParseFileName(GetFileName(fname), &number, "LOG", &type)) {
if (type == kTableFile) {
// Not checked here
} else if (type == kWalFile) {
if (opts.temperature != expected_wal_temperature) {
std::cerr << "Attempt to open " << fname << " with temperature "
<< temperature_to_string[opts.temperature]
<< " rather than "
<< temperature_to_string[expected_wal_temperature]
<< std::endl;
assert(false);
}
} else if (type == kDescriptorFile) {
if (opts.temperature != expected_manifest_temperature) {
std::cerr << "Attempt to open " << fname << " with temperature "
<< temperature_to_string[opts.temperature]
<< " rather than "
<< temperature_to_string[expected_wal_temperature]
<< std::endl;
assert(false);
}
} else if (opts.temperature != expected_other_metadata_temperature) {
std::cerr << "Attempt to open " << fname << " with temperature "
<< temperature_to_string[opts.temperature]
<< " rather than "
<< temperature_to_string[expected_wal_temperature]
<< std::endl;
assert(false);
}
UpdateCount(type, 1);
}
}
return ios;
}

IOStatus RenameFile(const std::string& src, const std::string& dst,
const IOOptions& options,
IODebugContext* dbg) override {
IOStatus ios = FileTemperatureTestFS::RenameFile(src, dst, options, dbg);
if (ios.ok()) {
uint64_t number;
FileType src_type;
FileType dst_type;
assert(ParseFileName(GetFileName(src), &number, "LOG", &src_type));
assert(ParseFileName(GetFileName(dst), &number, "LOG", &dst_type));

UpdateCount(src_type, -1);
UpdateCount(dst_type, 1);
}
return ios;
}

void UpdateCount(FileType type, int delta) {
size_t i = static_cast<size_t>(type);
assert(i < kNumberFileTypes);
counts[i].FetchAddRelaxed(delta);
}

std::map<FileType, size_t> PopCounts() {
std::map<FileType, size_t> ret;
for (size_t i = 0; i < kNumberFileTypes; ++i) {
int c = counts[i].ExchangeRelaxed(0);
if (c > 0) {
ret[static_cast<FileType>(i)] = c;
}
}
return ret;
}

FileOptions OptimizeForLogWrite(
const FileOptions& file_options,
const DBOptions& /*db_options*/) const override {
FileOptions opts = file_options;
if (optimize_wal_temperature != Temperature::kUnknown) {
opts.temperature = optimize_wal_temperature;
}
return opts;
}

FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override {
FileOptions opts = file_options;
if (optimize_manifest_temperature != Temperature::kUnknown) {
opts.temperature = optimize_manifest_temperature;
}
return opts;
}

void Reset() {
optimize_manifest_temperature = Temperature::kUnknown;
optimize_wal_temperature = Temperature::kUnknown;
expected_manifest_temperature = Temperature::kUnknown;
expected_other_metadata_temperature = Temperature::kUnknown;
expected_wal_temperature = Temperature::kUnknown;
for (auto& c : counts) {
c.StoreRelaxed(0);
}
}

Temperature optimize_manifest_temperature;
Temperature optimize_wal_temperature;
Temperature expected_manifest_temperature;
Temperature expected_other_metadata_temperature;
Temperature expected_wal_temperature;
std::array<RelaxedAtomic<int>, kNumberFileTypes> counts;
};

// We don't have enough non-unknown temps to confidently distinguish that
// a specific setting caused a specific outcome, in a single run. This is a
// reasonable work-around without blowing up test time. Only returns
// non-unknown temperatures.
auto RandomTemp = [] {
static std::vector<Temperature> temps = {
Temperature::kHot, Temperature::kWarm, Temperature::kCold};
return temps[Random::GetTLSInstance()->Uniform(
static_cast<int>(temps.size()))];
};

auto test_fs = std::make_shared<MyTestFS>(env_->GetFileSystem());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
for (bool use_optimize : {false, true}) {
std::cerr << "use_optimize: " << std::to_string(use_optimize) << std::endl;
for (bool use_temp_options : {false, true}) {
std::cerr << "use_temp_options: " << std::to_string(use_temp_options)
<< std::endl;

Options options = CurrentOptions();
// Currently require for last level temperature
options.compaction_style = kCompactionStyleUniversal;
options.env = env.get();
test_fs->Reset();
if (use_optimize) {
test_fs->optimize_manifest_temperature = RandomTemp();
test_fs->expected_manifest_temperature =
test_fs->optimize_manifest_temperature;
test_fs->optimize_wal_temperature = RandomTemp();
test_fs->expected_wal_temperature = test_fs->optimize_wal_temperature;
}
if (use_temp_options) {
options.metadata_write_temperature = RandomTemp();
test_fs->expected_manifest_temperature =
options.metadata_write_temperature;
test_fs->expected_other_metadata_temperature =
options.metadata_write_temperature;
options.wal_write_temperature = RandomTemp();
test_fs->expected_wal_temperature = options.wal_write_temperature;
options.last_level_temperature = RandomTemp();
options.default_write_temperature = RandomTemp();
}

DestroyAndReopen(options);
Defer closer([&] { Close(); });

using FTC = std::map<FileType, size_t>;
// Files on DB startup
ASSERT_EQ(test_fs->PopCounts(), FTC({{kWalFile, 1},
{kDescriptorFile, 2},
{kCurrentFile, 2},
{kIdentityFile, 1},
{kOptionsFile, 1}}));

// Temperature count map
using TCM = std::map<Temperature, size_t>;
ASSERT_EQ(test_fs->CountCurrentSstFilesByTemp(), TCM({}));

ASSERT_OK(Put("foo", "1"));
ASSERT_OK(Put("bar", "1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "2"));
ASSERT_OK(Put("bar", "2"));
ASSERT_OK(Flush());

ASSERT_EQ(test_fs->CountCurrentSstFilesByTemp(),
TCM({{options.default_write_temperature, 2}}));

ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));

ASSERT_EQ(test_fs->CountCurrentSstFilesByTemp(),
TCM({{options.last_level_temperature, 1}}));

ASSERT_OK(Put("foo", "3"));
ASSERT_OK(Put("bar", "3"));
ASSERT_OK(Flush());

// Just in memtable/WAL
ASSERT_OK(Put("dog", "3"));

{
TCM expected;
expected[options.default_write_temperature] += 1;
expected[options.last_level_temperature] += 1;
ASSERT_EQ(test_fs->CountCurrentSstFilesByTemp(), expected);
}

// New files during operation
ASSERT_EQ(test_fs->PopCounts(), FTC({{kWalFile, 3}, {kTableFile, 4}}));

Reopen(options);

// New files during re-open/recovery
ASSERT_EQ(test_fs->PopCounts(), FTC({{kWalFile, 1},
{kTableFile, 1},
{kDescriptorFile, 1},
{kCurrentFile, 1},
{kOptionsFile, 1}}));

Destroy(options);
}
}
}

TEST_F(DBTest2, LastLevelTemperature) {
class TestListener : public EventListener {
public:
Expand Down
11 changes: 10 additions & 1 deletion db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,15 @@ class FileTemperatureTestFS : public FileSystemWrapper {
return count;
}

std::map<Temperature, size_t> CountCurrentSstFilesByTemp() {
MutexLock lock(&mu_);
std::map<Temperature, size_t> ret;
for (const auto& e : current_sst_file_temperatures_) {
ret[e.second]++;
}
return ret;
}

void OverrideSstFileTemperature(uint64_t number, Temperature temp) {
MutexLock lock(&mu_);
current_sst_file_temperatures_[number] = temp;
Expand All @@ -842,7 +851,7 @@ class FileTemperatureTestFS : public FileSystemWrapper {
requested_sst_file_temperatures_;
std::map<uint64_t, Temperature> current_sst_file_temperatures_;

std::string GetFileName(const std::string& fname) {
static std::string GetFileName(const std::string& fname) {
auto filename = fname.substr(fname.find_last_of(kFilePathSeparator) + 1);
// workaround only for Windows that the file path could contain both Windows
// FilePathSeparator and '/'
Expand Down
6 changes: 4 additions & 2 deletions db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class FlushJobTestBase : public testing::Test {
}

void NewDB() {
ASSERT_OK(SetIdentityFile(WriteOptions(), env_, dbname_));
ASSERT_OK(
SetIdentityFile(WriteOptions(), env_, dbname_, Temperature::kUnknown));
VersionEdit new_db;

new_db.SetLogNumber(0);
Expand Down Expand Up @@ -114,7 +115,8 @@ class FlushJobTestBase : public testing::Test {
}
ASSERT_OK(s);
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(WriteOptions(), fs_.get(), dbname_, 1, nullptr);
s = SetCurrentFile(WriteOptions(), fs_.get(), dbname_, 1,
Temperature::kUnknown, nullptr);
ASSERT_OK(s);
}

Expand Down
10 changes: 7 additions & 3 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5511,6 +5511,10 @@ Status VersionSet::ProcessManifestWrites(
std::unique_ptr<log::Writer> new_desc_log_ptr;
{
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
// DB option (in file_options_) takes precedence when not kUnknown
if (file_options_.temperature != Temperature::kUnknown) {
opt_file_opts.temperature = file_options_.temperature;
}
mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestStart");
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
Expand Down Expand Up @@ -5637,9 +5641,9 @@ Status VersionSet::ProcessManifestWrites(
assert(manifest_io_status.ok());
}
if (s.ok() && new_descriptor_log) {
io_s = SetCurrentFile(write_options, fs_.get(), dbname_,
pending_manifest_file_number_,
dir_contains_current_file);
io_s = SetCurrentFile(
write_options, fs_.get(), dbname_, pending_manifest_file_number_,
file_options_.temperature, dir_contains_current_file);
if (!io_s.ok()) {
s = io_s;
// Quarantine old manifest file in case new manifest file's CURRENT file
Expand Down
Loading

0 comments on commit aaa570b

Please sign in to comment.