Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More vote cache optimizations #4629

Merged
merged 8 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
});

vote_router.vote_processed.add ([this] (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> const & results) {
vote_cache.observe (vote, source, results);
if (source != nano::vote_source::cache)
{
vote_cache.insert (vote, results);
}
});

// Republish vote if it is new and the node does not host a principal representative (or close to)
Expand Down
143 changes: 67 additions & 76 deletions nano/node/vote_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ bool nano::vote_cache_entry::vote (std::shared_ptr<nano::vote> const & vote, con
bool updated = vote_impl (vote, rep_weight, max_voters);
if (updated)
{
auto [tally, final_tally] = calculate_tally ();
tally_m = tally;
final_tally_m = final_tally;
last_vote_m = std::chrono::steady_clock::now ();
}
return updated;
Expand All @@ -36,15 +39,12 @@ bool nano::vote_cache_entry::vote_impl (std::shared_ptr<nano::vote> const & vote
// It is not essential to keep tally up to date if rep voting weight changes, elections do tally calculations independently, so in the worst case scenario only our queue ordering will be a bit off
if (vote->timestamp () > existing->vote->timestamp ())
{
bool was_final = existing->vote->is_final ();
voters.modify (existing, [&vote, &rep_weight] (auto & existing) {
existing.vote = vote;
existing.weight = rep_weight;
});
return true;
}
else
{
return false;
return !was_final && vote->is_final (); // Tally changed only if the vote became final
}
}
else
Expand Down Expand Up @@ -76,35 +76,24 @@ bool nano::vote_cache_entry::vote_impl (std::shared_ptr<nano::vote> const & vote

return true;
}
else
{
return false;
}
}
return false; // Tally unchanged
}

std::size_t nano::vote_cache_entry::size () const
{
return voters.size ();
}

nano::block_hash nano::vote_cache_entry::hash () const
{
return hash_m;
}

nano::uint128_t nano::vote_cache_entry::tally () const
{
return std::accumulate (voters.begin (), voters.end (), nano::uint128_t{ 0 }, [] (auto sum, auto const & item) {
return sum + item.weight;
});
}

nano::uint128_t nano::vote_cache_entry::final_tally () const
auto nano::vote_cache_entry::calculate_tally () const -> std::pair<nano::uint128_t, nano::uint128_t>
{
return std::accumulate (voters.begin (), voters.end (), nano::uint128_t{ 0 }, [] (auto sum, auto const & item) {
return sum + (item.vote->is_final () ? item.weight : 0);
});
nano::uint128_t tally{ 0 }, final_tally{ 0 };
for (auto const & voter : voters)
{
tally += voter.weight;
final_tally += voter.vote->is_final () ? voter.weight : 0;
}
return { tally, final_tally };
}

std::vector<std::shared_ptr<nano::vote>> nano::vote_cache_entry::votes () const
Expand All @@ -113,11 +102,6 @@ std::vector<std::shared_ptr<nano::vote>> nano::vote_cache_entry::votes () const
return { r.begin (), r.end () };
}

std::chrono::steady_clock::time_point nano::vote_cache_entry::last_vote () const
{
return last_vote_m;
}

/*
* vote_cache
*/
Expand All @@ -128,61 +112,66 @@ nano::vote_cache::vote_cache (vote_cache_config const & config_a, nano::stats &
{
}

void nano::vote_cache::observe (const std::shared_ptr<nano::vote> & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> results)
void nano::vote_cache::insert (std::shared_ptr<nano::vote> const & vote, std::unordered_map<nano::block_hash, nano::vote_code> const & results)
{
if (source != nano::vote_source::cache)
{
insert (vote, [&results] (nano::block_hash const & hash) {
// This filters which hashes should be included in the vote cache
if (auto it = results.find (hash); it != results.end ())
{
auto result = it->second;
// Cache votes with a corresponding active election (indicated by `vote_code::vote`) in case that election gets dropped
return result == nano::vote_code::vote || result == nano::vote_code::indeterminate;
}
debug_assert (false);
return false;
});
}
}
// Results map should be empty or have the same hashes as the vote
debug_assert (results.empty () || std::all_of (vote->hashes.begin (), vote->hashes.end (), [&results] (auto const & hash) { return results.find (hash) != results.end (); }));

void nano::vote_cache::insert (std::shared_ptr<nano::vote> const & vote, std::function<bool (nano::block_hash const &)> filter)
{
auto const representative = vote->account;
auto const timestamp = vote->timestamp ();
auto const rep_weight = rep_weight_query (representative);

nano::lock_guard<nano::mutex> lock{ mutex };

for (auto const & hash : vote->hashes)
// Cache votes with a corresponding active election (indicated by `vote_code::vote`) in case that election gets dropped
auto filter = [] (auto code) {
return code == nano::vote_code::vote || code == nano::vote_code::indeterminate;
};

// If results map is empty, insert all hashes (meant for testing)
if (results.empty ())
{
// Using filter callback here to avoid unnecessary relocking when processing large votes
if (!filter (hash))
for (auto const & hash : vote->hashes)
{
continue;
insert_impl (vote, hash, rep_weight);
}

if (auto existing = cache.find (hash); existing != cache.end ())
}
else
{
for (auto const & [hash, code] : results)
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::update);

cache.modify (existing, [this, &vote, &rep_weight] (entry & ent) {
ent.vote (vote, rep_weight, config.max_voters);
});
if (filter (code))
{
insert_impl (vote, hash, rep_weight);
}
}
else
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::insert);
}
}

entry cache_entry{ hash };
cache_entry.vote (vote, rep_weight, config.max_voters);
cache.insert (cache_entry);
void nano::vote_cache::insert_impl (std::shared_ptr<nano::vote> const & vote, nano::block_hash const & hash, nano::uint128_t const & rep_weight)
{
debug_assert (!mutex.try_lock ());
debug_assert (std::any_of (vote->hashes.begin (), vote->hashes.end (), [&hash] (auto const & vote_hash) { return vote_hash == hash; }));

// Remove the oldest entry if we have reached the capacity limit
if (cache.size () > config.max_size)
{
cache.get<tag_sequenced> ().pop_front ();
}
if (auto existing = cache.find (hash); existing != cache.end ())
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::update);

cache.modify (existing, [this, &vote, &rep_weight] (entry & ent) {
ent.vote (vote, rep_weight, config.max_voters);
});
}
else
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::insert);

entry cache_entry{ hash };
cache_entry.vote (vote, rep_weight, config.max_voters);
cache.insert (cache_entry);

// Remove the oldest entry if we have reached the capacity limit
if (cache.size () > config.max_size)
{
cache.get<tag_sequenced> ().pop_front ();
}
}
}
Expand Down Expand Up @@ -231,11 +220,11 @@ void nano::vote_cache::clear ()
cache.clear ();
}

std::vector<nano::vote_cache::top_entry> nano::vote_cache::top (const nano::uint128_t & min_tally)
std::deque<nano::vote_cache::top_entry> nano::vote_cache::top (const nano::uint128_t & min_tally)
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::top);

std::vector<top_entry> results;
std::deque<top_entry> results;
{
nano::lock_guard<nano::mutex> lock{ mutex };

Expand All @@ -244,12 +233,14 @@ std::vector<nano::vote_cache::top_entry> nano::vote_cache::top (const nano::uint
cleanup ();
}

for (auto & entry : cache)
for (auto & entry : cache.get<tag_tally> ())
{
if (entry.tally () >= min_tally)
auto tally = entry.tally ();
if (tally < min_tally)
{
results.push_back ({ entry.hash (), entry.tally (), entry.final_tally () });
break;
}
results.push_back ({ entry.hash (), tally, entry.final_tally () });
}
}

Expand Down
40 changes: 28 additions & 12 deletions nano/node/vote_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,29 @@ class vote_cache_entry final
bool vote (std::shared_ptr<nano::vote> const & vote, nano::uint128_t const & rep_weight, std::size_t max_voters);

std::size_t size () const;
nano::block_hash hash () const;
nano::uint128_t tally () const;
nano::uint128_t final_tally () const;
std::vector<std::shared_ptr<nano::vote>> votes () const;
std::chrono::steady_clock::time_point last_vote () const;

public: // Keep accessors inlined
nano::block_hash hash () const
{
return hash_m;
}
std::chrono::steady_clock::time_point last_vote () const
{
return last_vote_m;
}
nano::uint128_t tally () const
{
return tally_m;
}
nano::uint128_t final_tally () const
{
return final_tally_m;
}

private:
bool vote_impl (std::shared_ptr<nano::vote> const & vote, nano::uint128_t const & rep_weight, std::size_t max_voters);
std::pair<nano::uint128_t, nano::uint128_t> calculate_tally () const; // <tally, final_tally>

// clang-format off
class tag_representative {};
Expand All @@ -95,6 +110,8 @@ class vote_cache_entry final

nano::block_hash const hash_m;
std::chrono::steady_clock::time_point last_vote_m{};
nano::uint128_t tally_m{ 0 };
nano::uint128_t final_tally_m{ 0 };
};

class vote_cache final
Expand All @@ -110,12 +127,7 @@ class vote_cache final
*/
void insert (
std::shared_ptr<nano::vote> const & vote,
std::function<bool (nano::block_hash const &)> filter = [] (nano::block_hash const &) { return true; });

/**
* Should be called for every processed vote, filters which votes should be added to cache
*/
void observe (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code>);
std::unordered_map<nano::block_hash, nano::vote_code> const & results = {});

/**
* Tries to find an entry associated with block hash
Expand Down Expand Up @@ -145,7 +157,7 @@ class vote_cache final
* The blocks are sorted in descending order by final tally, then by tally
* @param min_tally minimum tally threshold, entries below with their voting weight below this will be ignored
*/
std::vector<top_entry> top (nano::uint128_t const & min_tally);
std::deque<top_entry> top (nano::uint128_t const & min_tally);

public: // Container info
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name) const;
Expand All @@ -161,19 +173,23 @@ class vote_cache final
nano::stats & stats;

private:
void insert_impl (std::shared_ptr<nano::vote> const &, nano::block_hash const & hash, nano::uint128_t const & rep_weight);
void cleanup ();

// clang-format off
class tag_sequenced {};
class tag_hash {};
class tag_tally {};
// clang-format on

// clang-format off
using ordered_cache = boost::multi_index_container<entry,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_hash>,
mi::const_mem_fun<entry, nano::block_hash, &entry::hash>>,
mi::sequenced<mi::tag<tag_sequenced>>
mi::sequenced<mi::tag<tag_sequenced>>,
mi::ordered_non_unique<mi::tag<tag_tally>,
mi::const_mem_fun<entry, nano::uint128_t, &entry::tally>, std::greater<>> // DESC
>>;
// clang-format on
ordered_cache cache;
Expand Down
Loading
Loading