Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redo of Index/Filter/Data blocks sizes in Block (LRU) Block Cache per CF after rebase on RocksDB 8.1 (#516) #620

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### New Features
* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.
* Add a TablePinningPolicy to the BlockBasedTableOptions. This class controls when blocks should be pinned in memory for a block based table. The default behavior uses the MetadataCacheOptions to control pinning and behaves identical to the previous releases.
* Redo of Index/Filter/Data blocks sizes in Block (LRU) Block Cache per CF after rebase on RocksDB 8.1 . This was part of v2.3.0 and was broken due to changes made in RocksDB. This feature provides per CF information on the size of its Index / Filter / Data blocks in the block cache (only for LRUCache at the moment). The information is printed to the log and the kBlockCacheCfStats and kFastBlockCacheCfStats properties were added to support obtaining the information programmatically.

### Enhancements
* db_bench: add estimate-table-readers-mem benchmark which prints these stats.
Expand Down
50 changes: 50 additions & 0 deletions cache/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,54 @@ void Cache::SetEvictionCallback(EvictionCallback&& fn) {
eviction_callback_ = std::move(fn);
}

// ==================================================================================================================================
Cache::ItemOwnerId Cache::ItemOwnerIdAllocator::Allocate() {
// In practice, onwer-ids are allocated and freed when cf-s
// are created and destroyed => relatively rare => paying
// the price to always lock the mutex and simplify the code
std::lock_guard<std::mutex> lock(free_ids_mutex_);

// First allocate from the free list if possible
if (free_ids_.empty() == false) {
auto allocated_id = free_ids_.front();
free_ids_.pop_front();
return allocated_id;
}

// Nothing on the free list - try to allocate from the
// next item counter if not yet exhausted
if (has_wrapped_around_) {
// counter exhausted, allocation not possible
return kUnknownItemOwnerId;
}

auto allocated_id = next_item_owner_id_++;

if (allocated_id == kMaxItemOnwerId) {
has_wrapped_around_ = true;
}

return allocated_id;
}

void Cache::ItemOwnerIdAllocator::Free(ItemOwnerId* id) {
if (*id != kUnknownItemOwnerId) {
std::lock_guard<std::mutex> lock(free_ids_mutex_);
// The freed id is lost but this is a luxury feature. We can't
// pay too much space to support it.
if (free_ids_.size() < kMaxFreeItemOwnersIdListSize) {
free_ids_.push_back(*id);
}
*id = kUnknownItemOwnerId;
}
}

Cache::ItemOwnerId Cache::GetNextItemOwnerId() {
return owner_id_allocator_.Allocate();
}

void Cache::DiscardItemOwnerId(ItemOwnerId* item_owner_id) {
owner_id_allocator_.Free(item_owner_id);
}

} // namespace ROCKSDB_NAMESPACE
15 changes: 15 additions & 0 deletions cache/cache_entry_roles.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,19 @@ std::string BlockCacheEntryStatsMapKeys::UsedPercent(CacheEntryRole role) {
return GetPrefixedCacheEntryRoleName(kPrefix, role);
}

const std::string& BlockCacheCfStatsMapKeys::CfName() {
static const std::string kCfName = "cf_name";
return kCfName;
}

const std::string& BlockCacheCfStatsMapKeys::CacheId() {
static const std::string kCacheId = "id";
return kCacheId;
}

std::string BlockCacheCfStatsMapKeys::UsedBytes(CacheEntryRole role) {
const static std::string kPrefix = "bytes.";
return GetPrefixedCacheEntryRoleName(kPrefix, role);
}

} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion cache/cache_entry_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class CacheEntryStatsCollector {
last_start_time_micros_ = start_time_micros;
working_stats_.BeginCollection(cache_, clock_, start_time_micros);

cache_->ApplyToAllEntries(working_stats_.GetEntryCallback(), {});
cache_->ApplyToAllEntriesWithOwnerId(working_stats_.GetEntryCallback(),
{});
TEST_SYNC_POINT_CALLBACK(
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries", nullptr);

Expand Down
17 changes: 14 additions & 3 deletions cache/clock_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,10 +1056,11 @@ void ClockCacheShard<Table>::EraseUnRefEntries() {
}

template <class Table>
void ClockCacheShard<Table>::ApplyToSomeEntries(
void ClockCacheShard<Table>::ApplyToSomeEntriesWithOwnerId(
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper)>& callback,
const Cache::CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>& callback,
size_t average_entries_per_lock, size_t* state) {
// The state is essentially going to be the starting hash, which works
// nicely even if we resize between calls because we use upper-most
Expand All @@ -1086,7 +1087,7 @@ void ClockCacheShard<Table>::ApplyToSomeEntries(
[callback](const HandleImpl& h) {
UniqueId64x2 unhashed;
callback(ReverseHash(h.hashed_key, &unhashed), h.value,
h.GetTotalCharge(), h.helper);
h.GetTotalCharge(), h.helper, Cache::kUnknownItemOwnerId);
},
index_begin, index_end, false);
}
Expand Down Expand Up @@ -1134,6 +1135,16 @@ Status ClockCacheShard<Table>::Insert(const Slice& key,
const Cache::CacheItemHelper* helper,
size_t charge, HandleImpl** handle,
Cache::Priority priority) {
return InsertWithOwnerId(key, hashed_key, value, helper, charge,
Cache::kUnknownItemOwnerId, handle, priority);
}

template <class Table>
Status ClockCacheShard<Table>::InsertWithOwnerId(
const Slice& key, const UniqueId64x2& hashed_key, Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::ItemOwnerId /* item_owner_id */, HandleImpl** handle,
Cache::Priority priority) {
if (UNLIKELY(key.size() != kCacheKeySize)) {
return Status::NotSupported("ClockCache only supports key size " +
std::to_string(kCacheKeySize) + "B");
Expand Down
13 changes: 10 additions & 3 deletions cache/clock_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,12 @@ class ALIGN_AS(CACHE_LINE_SIZE) ClockCacheShard final : public CacheShardBase {
Cache::ObjectPtr value, const Cache::CacheItemHelper* helper,
size_t charge, HandleImpl** handle, Cache::Priority priority);

Status InsertWithOwnerId(const Slice& key, const UniqueId64x2& hashed_key,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::ItemOwnerId /* item_owner_id */,
HandleImpl** handle, Cache::Priority priority);

HandleImpl* CreateStandalone(const Slice& key, const UniqueId64x2& hashed_key,
Cache::ObjectPtr obj,
const Cache::CacheItemHelper* helper,
Expand Down Expand Up @@ -643,10 +649,11 @@ class ALIGN_AS(CACHE_LINE_SIZE) ClockCacheShard final : public CacheShardBase {

size_t GetTableAddressCount() const;

void ApplyToSomeEntries(
const std::function<void(const Slice& key, Cache::ObjectPtr obj,
void ApplyToSomeEntriesWithOwnerId(
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper)>& callback,
const Cache::CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>& callback,
size_t average_entries_per_lock, size_t* state);

void EraseUnRefEntries();
Expand Down
28 changes: 21 additions & 7 deletions cache/lru_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ void LRUCacheShard::EraseUnRefEntries() {
}
}

void LRUCacheShard::ApplyToSomeEntries(
void LRUCacheShard::ApplyToSomeEntriesWithOwnerId(
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper)>& callback,
const Cache::CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>& callback,
size_t average_entries_per_lock, size_t* state) {
// The state is essentially going to be the starting hash, which works
// nicely even if we resize between calls because we use upper-most
Expand Down Expand Up @@ -196,7 +197,7 @@ void LRUCacheShard::ApplyToSomeEntries(
[callback,
metadata_charge_policy = metadata_charge_policy_](LRUHandle* h) {
callback(h->key(), h->value, h->GetCharge(metadata_charge_policy),
h->helper);
h->helper, h->item_owner_id);
},
index_begin, index_end);
}
Expand Down Expand Up @@ -518,7 +519,8 @@ bool LRUCacheShard::Release(LRUHandle* e, bool /*useful*/,
LRUHandle* LRUCacheShard::CreateHandle(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
size_t charge) {
size_t charge,
Cache::ItemOwnerId item_owner_id) {
assert(helper);
// value == nullptr is reserved for indicating failure in SecondaryCache
assert(!(helper->IsSecondaryCacheCompatible() && value == nullptr));
Expand All @@ -539,7 +541,7 @@ LRUHandle* LRUCacheShard::CreateHandle(const Slice& key, uint32_t hash,
e->next = e->prev = nullptr;
memcpy(e->key_data, key.data(), key.size());
e->CalcTotalCharge(charge, metadata_charge_policy_);

e->item_owner_id = item_owner_id;
return e;
}

Expand All @@ -548,7 +550,18 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* helper,
size_t charge, LRUHandle** handle,
Cache::Priority priority) {
LRUHandle* e = CreateHandle(key, hash, value, helper, charge);
return InsertWithOwnerId(key, hash, value, helper, charge,
Cache::kUnknownItemOwnerId, handle, priority);
}

Status LRUCacheShard::InsertWithOwnerId(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
size_t charge,
Cache::ItemOwnerId item_owner_id,
LRUHandle** handle,
Cache::Priority priority) {
LRUHandle* e = CreateHandle(key, hash, value, helper, charge, item_owner_id);
e->SetPriority(priority);
e->SetInCache(true);
return InsertItem(e, handle);
Expand All @@ -559,7 +572,8 @@ LRUHandle* LRUCacheShard::CreateStandalone(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* helper,
size_t charge,
bool allow_uncharged) {
LRUHandle* e = CreateHandle(key, hash, value, helper, charge);
LRUHandle* e = CreateHandle(key, hash, value, helper, charge,
Cache::kUnknownItemOwnerId);
e->SetIsStandalone(true);
e->Ref();

Expand Down
15 changes: 12 additions & 3 deletions cache/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct LRUHandle {
uint32_t hash;
// The number of external refs to this entry. The cache itself is not counted.
uint32_t refs;
Cache::ItemOwnerId item_owner_id = Cache::kUnknownItemOwnerId;

// Mutable flags - access controlled by mutex
// The m_ and M_ prefixes (and im_ and IM_ later) are to hopefully avoid
Expand Down Expand Up @@ -302,6 +303,12 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShardBase {
const Cache::CacheItemHelper* helper, size_t charge,
LRUHandle** handle, Cache::Priority priority);

Status InsertWithOwnerId(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::ItemOwnerId /* item_owner_id */,
LRUHandle** handle, Cache::Priority priority);

LRUHandle* CreateStandalone(const Slice& key, uint32_t hash,
Cache::ObjectPtr obj,
const Cache::CacheItemHelper* helper,
Expand All @@ -325,10 +332,11 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShardBase {
size_t GetOccupancyCount() const;
size_t GetTableAddressCount() const;

void ApplyToSomeEntries(
void ApplyToSomeEntriesWithOwnerId(
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper)>& callback,
const Cache::CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>& callback,
size_t average_entries_per_lock, size_t* state);

void EraseUnRefEntries();
Expand Down Expand Up @@ -373,7 +381,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShardBase {

LRUHandle* CreateHandle(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge);
const Cache::CacheItemHelper* helper, size_t charge,
Cache::ItemOwnerId item_owner_id);

// Initialized before use.
size_t capacity_;
Expand Down
31 changes: 28 additions & 3 deletions cache/sharded_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,19 @@ class ShardedCache : public ShardedCacheBase {
Status Insert(const Slice& key, ObjectPtr obj, const CacheItemHelper* helper,
size_t charge, Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
return InsertWithOwnerId(key, obj, helper, charge, kUnknownItemOwnerId,
handle, priority);
}

Status InsertWithOwnerId(const Slice& key, ObjectPtr obj,
const CacheItemHelper* helper, size_t charge,
ItemOwnerId item_owner_id, Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
assert(helper);
HashVal hash = CacheShard::ComputeHash(key);
auto h_out = reinterpret_cast<HandleImpl**>(handle);
return GetShard(hash).Insert(key, hash, obj, helper, charge, h_out,
priority);
return GetShard(hash).InsertWithOwnerId(key, hash, obj, helper, charge,
item_owner_id, h_out, priority);
}

Handle* CreateStandalone(const Slice& key, ObjectPtr obj,
Expand Down Expand Up @@ -235,6 +243,22 @@ class ShardedCache : public ShardedCacheBase {
const std::function<void(const Slice& key, ObjectPtr value, size_t charge,
const CacheItemHelper* helper)>& callback,
const ApplyToAllEntriesOptions& opts) override {
auto callback_with_owner_id =
[&callback](const Slice& key, ObjectPtr obj, size_t charge,
const CacheItemHelper* helper,
Cache::ItemOwnerId /* item_owner_id */) {
callback(key, obj, charge, helper);
};

ApplyToAllEntriesWithOwnerId(callback_with_owner_id, opts);
}

void ApplyToAllEntriesWithOwnerId(
const std::function<void(const Slice& key, ObjectPtr obj, size_t charge,
const CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>&
callback_with_owner_id,
const ApplyToAllEntriesOptions& opts) override {
uint32_t num_shards = GetNumShards();
// Iterate over part of each shard, rotating between shards, to
// minimize impact on latency of concurrent operations.
Expand All @@ -248,7 +272,8 @@ class ShardedCache : public ShardedCacheBase {
remaining_work = false;
for (uint32_t i = 0; i < num_shards; i++) {
if (states[i] != SIZE_MAX) {
shards_[i].ApplyToSomeEntries(callback, aepl, &states[i]);
shards_[i].ApplyToSomeEntriesWithOwnerId(callback_with_owner_id, aepl,
&states[i]);
remaining_work |= states[i] != SIZE_MAX;
}
}
Expand Down
8 changes: 5 additions & 3 deletions cache/typed_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,15 @@ class FullTypedCacheInterface
inline Status InsertFull(
const Slice& key, TValuePtr value, size_t charge,
TypedHandle** handle = nullptr, Priority priority = Priority::LOW,
CacheTier lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier) {
CacheTier lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier,
Cache::ItemOwnerId item_owner_id = Cache::kUnknownItemOwnerId) {
auto untyped_handle = reinterpret_cast<Handle**>(handle);
auto helper = lowest_used_cache_tier == CacheTier::kNonVolatileBlockTier
? GetFullHelper()
: GetBasicHelper();
return this->cache_->Insert(key, UpCastValue(value), helper, charge,
untyped_handle, priority);
return this->cache_->InsertWithOwnerId(key, UpCastValue(value), helper,
charge, item_owner_id,
untyped_handle, priority);
}

// Like SecondaryCache::InsertSaved, with SecondaryCache compatibility
Expand Down
11 changes: 11 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,11 @@ ColumnFamilyData::ColumnFamilyData(
CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>(
bbto->block_cache)));
}

if (bbto->block_cache && table_cache_) {
cache_owner_id_ = bbto->block_cache->GetNextItemOwnerId();
table_cache_->SetBlockCacheOwnerId(cache_owner_id_);
}
}
}

Expand All @@ -662,6 +667,12 @@ ColumnFamilyData::~ColumnFamilyData() {
prev->next_ = next;
next->prev_ = prev;

const BlockBasedTableOptions* bbto =
ioptions_.table_factory->GetOptions<BlockBasedTableOptions>();
Yuval-Ariel marked this conversation as resolved.
Show resolved Hide resolved
if (bbto && bbto->block_cache) {
bbto->block_cache->DiscardItemOwnerId(&cache_owner_id_);
}

if (!dropped_ && column_family_set_ != nullptr) {
// If it's dropped, it's already removed from column family set
// If column_family_set_ == nullptr, this is dummy CFD and not in
Expand Down
4 changes: 4 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ class ColumnFamilyData {
static constexpr uint64_t kLaggingFlushesThreshold = 10U;
void SetNumTimedQueuedForFlush(uint64_t num) { num_queued_for_flush_ = num; }

Cache::ItemOwnerId GetCacheOwnerId() const { return cache_owner_id_; }

// Allocate and return a new epoch number
uint64_t NewEpochNumber() { return next_epoch_number_.fetch_add(1); }

Expand Down Expand Up @@ -688,6 +690,8 @@ class ColumnFamilyData {
uint64_t num_queued_for_flush_ = 0U;

std::atomic<uint64_t> next_epoch_number_;

Cache::ItemOwnerId cache_owner_id_ = Cache::kUnknownItemOwnerId;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
Loading
Loading