Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a TablePinningPolicy to control how and when memory is pinned #459

Merged
merged 18 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ set(SOURCES
table/block_based/partitioned_index_iterator.cc
table/block_based/partitioned_index_reader.cc
table/block_based/reader_common.cc
table/block_based/table_pinning_policy.cc
table/block_based/uncompression_dict_reader.cc
table/block_fetcher.cc
table/cuckoo/cuckoo_table_builder.cc
Expand Down
2 changes: 1 addition & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Speedb Change Log

## Unreleased

* Add a TablePinningPolicy to the BlockBasedTableOptions. This class controls when blocks should be pinned in memory for a block based table. The default behavior uses the MetadataCacheOptions to control pinning and behaves identical to the previous releases.
### Enhancements
* db_bench: add estimate-table-readers-mem benchmark which prints these stats.

Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"table/block_based/partitioned_index_iterator.cc",
"table/block_based/partitioned_index_reader.cc",
"table/block_based/reader_common.cc",
"table/block_based/table_pinning_policy.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/compaction_merging_iterator.cc",
Expand Down Expand Up @@ -550,6 +551,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"table/block_based/partitioned_index_iterator.cc",
"table/block_based/partitioned_index_reader.cc",
"table/block_based/reader_common.cc",
"table/block_based/table_pinning_policy.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/compaction_merging_iterator.cc",
Expand Down
2 changes: 1 addition & 1 deletion db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*force_direct_prefetch*/ false, /*level*/ -1, /*bottommost*/ false,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
Expand Down
2 changes: 1 addition & 1 deletion db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*force_direct_prefetch*/ false, /*level*/ -1, /*bottommost*/ false,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
Expand Down
5 changes: 3 additions & 2 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@ Status TableCache::GetTableReader(
file->Hint(FSRandomAccessFile::kRandom);
}
StopWatch sw(ioptions_.clock, ioptions_.stats, TABLE_OPEN_IO_MICROS);
bool is_bottom = (level == ioptions_.num_levels - 1);
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), fname, ioptions_.clock, io_tracer_,
record_read_stats ? ioptions_.stats : nullptr, SST_READ_MICROS,
file_read_hist, ioptions_.rate_limiter.get(), ioptions_.listeners,
file_temperature, level == ioptions_.num_levels - 1));
file_temperature, is_bottom));
UniqueId64x2 expected_unique_id;
if (ioptions_.verify_sst_unique_id_in_manifest) {
expected_unique_id = file_meta.unique_id;
Expand All @@ -140,7 +141,7 @@ Status TableCache::GetTableReader(
ro,
TableReaderOptions(ioptions_, prefix_extractor, file_options,
internal_comparator, skip_filters, immortal_tables_,
false /* force_direct_prefetch */, level,
false /* force_direct_prefetch */, level, is_bottom,
block_cache_tracer_, max_file_size_for_l0_meta_pin,
db_session_id_, file_meta.fd.GetNumber(),
expected_unique_id, file_meta.fd.largest_seqno),
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ DECLARE_bool(compression_use_zstd_dict_trainer);
DECLARE_string(checksum_type);
DECLARE_string(env_uri);
DECLARE_string(fs_uri);
DECLARE_string(pinning_policy);
DECLARE_uint64(ops_per_thread);
DECLARE_uint64(log2_keys_per_lock);
DECLARE_uint64(max_manifest_file_size);
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ DEFINE_string(fs_uri, "",
" with --env_uri."
" Creates a default environment with the specified filesystem.");

DEFINE_string(pinning_policy, "", "URI for registry TablePinningPolicy");
Yuval-Ariel marked this conversation as resolved.
Show resolved Hide resolved

DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread.");
static const bool FLAGS_ops_per_thread_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range);
Expand Down
20 changes: 20 additions & 0 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rocksdb/filter_policy.h"
#include "rocksdb/secondary_cache.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/table_pinning_policy.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/write_batch_with_index.h"
Expand Down Expand Up @@ -506,6 +507,12 @@ std::string StressTest::DebugString(const Slice& value,
void StressTest::PrintStatistics() {
if (dbstats) {
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
const auto bbto =
options_.table_factory->GetOptions<BlockBasedTableOptions>();
if (bbto != nullptr && bbto->pinning_policy) {
fprintf(stdout, "PINNING STATISTICS:\n%s\n",
bbto->pinning_policy->ToString().c_str());
}
}
if (dbstats_secondaries) {
fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
Expand Down Expand Up @@ -3098,6 +3105,19 @@ void InitializeOptionsFromFlags(
block_based_options.max_auto_readahead_size = FLAGS_max_auto_readahead_size;
block_based_options.num_file_reads_for_auto_readahead =
FLAGS_num_file_reads_for_auto_readahead;
if (!FLAGS_pinning_policy.empty()) {
ConfigOptions config_options;
config_options.ignore_unknown_options = false;
config_options.ignore_unsupported_options = false;
Status s = TablePinningPolicy::CreateFromString(
config_options, FLAGS_pinning_policy,
&block_based_options.pinning_policy);
if (!s.ok()) {
fprintf(stderr, "Failed to create PinningPolicy: %s\n",
s.ToString().c_str());
exit(1);
}
}
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));

// Write-Buffer-Manager
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct TableReaderOptions;
struct TableBuilderOptions;
class TableBuilder;
class TableFactory;
class TablePinningPolicy;
class TableReader;
class WritableFileWriter;
struct ConfigOptions;
Expand Down Expand Up @@ -655,6 +656,9 @@ struct BlockBasedTableOptions {
//
// Default: 2
uint64_t num_file_reads_for_auto_readahead = 2;

// EXPERIMENTAL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to mark it as experimental?
What will be the behaviour when not specifiying a pinning policy?
Or is experminetal referring to policies that are not the default such as ours?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to mark it as experimental? What will be the behaviour when not specifiying a pinning policy? Or is experminetal referring to policies that are not the default such as ours?

Experimental means that the interface is may change

std::shared_ptr<TablePinningPolicy> pinning_policy;
};

// Table Properties that are specific to block-based table properties.
Expand Down
118 changes: 118 additions & 0 deletions include/rocksdb/table_pinning_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#pragma once

#include "rocksdb/customizable.h"
#include "rocksdb/status.h"

namespace ROCKSDB_NAMESPACE {

struct BlockBasedTableOptions;
struct ConfigOptions;

// Struct that contains information about the table being evaluated for pinning
struct TablePinningOptions {
TablePinningOptions() = default;

TablePinningOptions(int _level, bool _is_bottom, size_t _file_size,
size_t _max_file_size_for_l0_meta_pin)
: level(_level),
is_bottom(_is_bottom),
file_size(_file_size),
max_file_size_for_l0_meta_pin(_max_file_size_for_l0_meta_pin) {}
int level = -1;
bool is_bottom = false;
size_t file_size = 0;
size_t max_file_size_for_l0_meta_pin = 0;
};

// Struct containing information about an entry that has been pinned
struct PinnedEntry {
PinnedEntry() {}
PinnedEntry(int _level, uint8_t _type, size_t _size)
: level(_level), type(_type), size(_size) {}

int level = -1;
uint8_t type = 0;
size_t size = 0;
};

// TablePinningPolicy provides a configurable way to determine when blocks
// should be pinned in memory for the block based tables.
//
// Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more.
class TablePinningPolicy : public Customizable {
public:
static const uint8_t kTopLevel = 1;
static const uint8_t kPartition = 2;
static const uint8_t kIndex = 3;
static const uint8_t kFilter = 4;
static const uint8_t kDictionary = 5;
static const char* Type() { return "TablePinningPolicy"; }

// Creates/Returns a new TablePinningPolicy based in the input value
static Status CreateFromString(const ConfigOptions& config_options,
const std::string& value,
std::shared_ptr<TablePinningPolicy>* policy);
virtual ~TablePinningPolicy() = default;

// Returns true if the block defined by type and size is a candidate for
// pinning This method indicates that pinning might be possible, but does not
// perform the pinning operation. Returns true if the data is a candidate for
// pinning and false otherwise
virtual bool MayPin(const TablePinningOptions& tpo, uint8_t type,
size_t size) const = 0;

// Attempts to pin the block in memory.
// If successful, pinned returns the pinned block
// Returns true and updates pinned on success and false if the data cannot be
// pinned
virtual bool PinData(const TablePinningOptions& tpo, uint8_t type,
size_t size, std::unique_ptr<PinnedEntry>* pinned) = 0;

// Releases and clears the pinned entry.
virtual void UnPinData(std::unique_ptr<PinnedEntry>&& pinned) = 0;

// Returns the amount of data currently pinned.
virtual size_t GetPinnedUsage() const = 0;

// Returns the info (e.g. statistics) associated with this policy.
virtual std::string ToString() const = 0;
};

class TablePinningPolicyWrapper : public TablePinningPolicy {
public:
explicit TablePinningPolicyWrapper(
const std::shared_ptr<TablePinningPolicy>& t)
: target_(t) {}
bool MayPin(const TablePinningOptions& tpo, uint8_t type,
size_t size) const override {
return target_->MayPin(tpo, type, size);
}

bool PinData(const TablePinningOptions& tpo, uint8_t type, size_t size,
std::unique_ptr<PinnedEntry>* pinned) override {
return target_->PinData(tpo, type, size, pinned);
}

void UnPinData(std::unique_ptr<PinnedEntry>&& pinned) override {
target_->UnPinData(std::move(pinned));
}

size_t GetPinnedUsage() const override { return target_->GetPinnedUsage(); }

protected:
std::shared_ptr<TablePinningPolicy> target_;
};

TablePinningPolicy* NewDefaultPinningPolicy(const BlockBasedTableOptions& bbto);
} // namespace ROCKSDB_NAMESPACE
32 changes: 31 additions & 1 deletion options/customizable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rocksdb/slice_transform.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table_pinning_policy.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
Expand Down Expand Up @@ -1392,6 +1393,22 @@ class MockFilterPolicy : public FilterPolicy {
}
};

class MockTablePinningPolicy : public TablePinningPolicy {
public:
static const char* kClassName() { return "Mock"; }
const char* Name() const override { return kClassName(); }
bool MayPin(const TablePinningOptions&, uint8_t, size_t) const override {
return false;
}
bool PinData(const TablePinningOptions&, uint8_t, size_t,
std::unique_ptr<PinnedEntry>*) override {
return false;
}
void UnPinData(std::unique_ptr<PinnedEntry>&&) override {}
size_t GetPinnedUsage() const override { return 0; }
std::string ToString() const override { return ""; }
};

static int RegisterLocalObjects(ObjectLibrary& library,
const std::string& /*arg*/) {
size_t num_types;
Expand Down Expand Up @@ -1506,14 +1523,20 @@ static int RegisterLocalObjects(ObjectLibrary& library,
guard->reset(new MockTablePropertiesCollectorFactory());
return guard->get();
});

library.AddFactory<const FilterPolicy>(
MockFilterPolicy::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<const FilterPolicy>* guard,
std::string* /* errmsg */) {
guard->reset(new MockFilterPolicy());
return guard->get();
});
library.AddFactory<TablePinningPolicy>(
MockTablePinningPolicy::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<TablePinningPolicy>* guard,
std::string* /* errmsg */) {
guard->reset(new MockTablePinningPolicy());
return guard->get();
});

return static_cast<int>(library.GetFactoryCount(&num_types));
}
Expand Down Expand Up @@ -2108,6 +2131,13 @@ TEST_F(LoadCustomizableTest, LoadFlushBlockPolicyFactoryTest) {
}
}

TEST_F(LoadCustomizableTest, LoadTablePiningPolicyTest) {
ASSERT_OK(TestSharedBuiltins<TablePinningPolicy>("Mock", ""));
if (RegisterTests("Test")) {
ExpectCreateShared<TablePinningPolicy>("Mock");
}
}

} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
Expand Down
3 changes: 3 additions & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "options/db_options.h"
#include "options/options_helper.h"
#include "rocksdb/convenience.h"
#include "rocksdb/table_pinning_policy.h"
#include "test_util/testharness.h"

#ifndef GFLAGS
Expand Down Expand Up @@ -129,6 +130,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
sizeof(CacheUsageOptions)},
{offsetof(struct BlockBasedTableOptions, filter_policy),
sizeof(std::shared_ptr<const FilterPolicy>)},
{offsetof(struct BlockBasedTableOptions, pinning_policy),
sizeof(std::shared_ptr<TablePinningPolicy>)},
};

// In this test, we catch a new option of BlockBasedTableOptions that is not
Expand Down
5 changes: 3 additions & 2 deletions plugin/speedb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
set(speedb_SOURCES
speedb_registry.cc
memtable/hash_spd_rep.cc
paired_filter/speedb_paired_bloom.cc
paired_filter/speedb_paired_bloom_internal.cc)
paired_filter/speedb_paired_bloom.cc
paired_filter/speedb_paired_bloom_internal.cc
pinning_policy/scoped_pinning_policy.cc)

set(speedb_FUNC register_SpeedbPlugins)
Loading
Loading