Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental backoff for local_block_broadcaster #4662

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3859,3 +3859,42 @@ TEST (node, process_local_overflow)
auto result = node.process_local (send1);
ASSERT_FALSE (result);
}

TEST (node, local_block_broadcast)
{
nano::test::system system;

// Disable active elections to prevent the block from being broadcasted by the election
auto node_config = system.default_config ();
node_config.active_elections.size = 0;
node_config.local_block_broadcaster.rebroadcast_interval = 1s;
auto & node1 = *system.add_node (node_config);
auto & node2 = *system.make_disconnected_node ();

nano::keypair key1;
nano::send_block_builder builder;
auto latest_hash = nano::dev::genesis->hash ();
auto send1 = builder.make_block ()
.previous (latest_hash)
.destination (key1.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (latest_hash))
.build ();

auto result = node1.process_local (send1);
ASSERT_TRUE (result);
ASSERT_NEVER (500ms, node1.active.active (send1->qualified_root ()));

// Wait until a broadcast is attempted
ASSERT_TIMELY_EQ (5s, node1.local_block_broadcaster.size (), 1);
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out) >= 1);

// The other node should not have received the block
ASSERT_NEVER (500ms, node2.block (send1->hash ()));

// Connect the nodes and check that the block is propagated
node1.network.merge_peer (node2.network.endpoint ());
ASSERT_TIMELY (5s, node1.network.find_node_id (node2.get_node_id ()));
ASSERT_TIMELY (10s, node2.block (send1->hash ()));
}
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ enum class type
signal_manager,
peer_history,
message_processor,
local_block_broadcaster,

// bootstrap
bulk_pull_client,
Expand Down
2 changes: 2 additions & 0 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
namespace nano
{
class active_elections;
class confirming_set;
class ledger;
class local_block_broadcaster;
class local_vote_history;
class logger;
class network;
Expand Down
146 changes: 101 additions & 45 deletions nano/node/local_block_broadcaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/local_block_broadcaster.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <nano/secure/ledger.hpp>

nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stats_a, bool enabled_a) :
nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_config const & config_a, nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) :
config{ config_a },
node{ node_a },
block_processor{ block_processor_a },
network{ network_a },
confirming_set{ confirming_set_a },
stats{ stats_a },
enabled{ enabled_a }
logger{ logger_a },
enabled{ enabled_a },
limiter{ config.broadcast_rate_limit, config.broadcast_rate_burst_ratio }
{
if (!enabled)
{
Expand All @@ -26,9 +31,20 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan
// Only rebroadcast local blocks that were successfully processed (no forks or gaps)
if (result == nano::block_status::progress && context.source == nano::block_source::local)
{
release_assert (context.block != nullptr);

nano::lock_guard<nano::mutex> guard{ mutex };

local_blocks.emplace_back (local_entry{ context.block, std::chrono::steady_clock::now () });
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::insert);

// Erase oldest blocks if the queue gets too big
while (local_blocks.size () > config.max_size)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest);
local_blocks.pop_front ();
}

should_notify = true;
}
}
Expand All @@ -41,7 +57,13 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan
block_processor.rolled_back.add ([this] (auto const & block) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased);
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased);
});

confirming_set.cemented_observers.add ([this] (auto const & block) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::cemented, erased);
});
}

Expand Down Expand Up @@ -76,44 +98,77 @@ void nano::local_block_broadcaster::stop ()
nano::join_or_pass (thread);
}

size_t nano::local_block_broadcaster::size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return local_blocks.size ();
}

void nano::local_block_broadcaster::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop);

condition.wait_for (lock, check_interval);
condition.wait_for (lock, 1s);
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds

if (!stopped)
if (!stopped && !local_blocks.empty ())
{
cleanup ();
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop);

if (cleanup_interval.elapsed (config.cleanup_interval))
{
cleanup (lock);
debug_assert (lock.owns_lock ());
}

run_broadcasts (lock);
debug_assert (lock.owns_lock ());
debug_assert (!lock.owns_lock ());
lock.lock ();
}
}
}

std::chrono::milliseconds nano::local_block_broadcaster::rebroadcast_interval (unsigned rebroadcasts) const
{
return std::min (config.rebroadcast_interval * rebroadcasts, config.max_rebroadcast_interval);
}

void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());

std::vector<std::shared_ptr<nano::block>> to_broadcast;
std::deque<local_entry> to_broadcast;

auto const now = std::chrono::steady_clock::now ();
for (auto & entry : local_blocks)

// Iterate blocks with next_broadcast <= now
auto & by_broadcast = local_blocks.get<tag_broadcast> ();
for (auto const & entry : boost::make_iterator_range (by_broadcast.begin (), by_broadcast.upper_bound (now)))
{
if (elapsed (entry.last_broadcast, broadcast_interval, now))
{
debug_assert (entry.next_broadcast <= now);
release_assert (entry.block != nullptr);
to_broadcast.push_back (entry);
}

// Modify multi index container outside of the loop to avoid invalidating iterators
auto & by_hash = local_blocks.get<tag_hash> ();
for (auto const & entry : to_broadcast)
{
auto it = by_hash.find (entry.hash ());
release_assert (it != by_hash.end ());
bool success = by_hash.modify (it, [this, now] (auto & entry) {
entry.rebroadcasts += 1;
entry.last_broadcast = now;
to_broadcast.push_back (entry.block);
}
entry.next_broadcast = now + rebroadcast_interval (entry.rebroadcasts);
});
release_assert (success, "modify failed"); // Should never fail
}

lock.unlock ();

for (auto const & block : to_broadcast)
for (auto const & entry : to_broadcast)
{
while (!limiter.should_pass (1))
{
Expand All @@ -124,41 +179,47 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock<nano::mute
}
}

stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out);
logger.debug (nano::log::type::local_block_broadcaster, "Broadcasting block: {} (rebroadcasts so far: {})",
entry.block->hash ().to_string (),
entry.rebroadcasts);

network.flood_block_initial (block);
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out);
network.flood_block_initial (entry.block);
}

lock.lock ();
}

void nano::local_block_broadcaster::cleanup ()
void nano::local_block_broadcaster::cleanup (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (!mutex.try_lock ());

// Erase oldest blocks if the queue gets too big
while (local_blocks.size () > max_size)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest);
local_blocks.pop_front ();
}
// Copy the local blocks to avoid holding the mutex during IO
auto local_blocks_copy = local_blocks;

// TODO: Mutex is held during IO, but it should be fine since it's not performance critical
auto transaction = node.ledger.tx_begin_read ();
erase_if (local_blocks, [this, &transaction] (auto const & entry) {
transaction.refresh_if_needed ();
lock.unlock ();

if (entry.last_broadcast == std::chrono::steady_clock::time_point{})
std::set<nano::block_hash> already_confirmed;
{
auto transaction = node.ledger.tx_begin_read ();
for (auto const & entry : local_blocks_copy)
{
// This block has never been broadcasted, keep it so it's broadcasted at least once
return false;
}
if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ()))
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_confirmed);
return true;
if (entry.last_broadcast == std::chrono::steady_clock::time_point{})
{
continue;
}
if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ()))
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::already_confirmed);
already_confirmed.insert (entry.block->hash ());
}
}
return false;
}

lock.lock ();

// Erase blocks that have been confirmed
erase_if (local_blocks, [&already_confirmed] (auto const & entry) {
return already_confirmed.contains (entry.block->hash ());
});
}

Expand All @@ -169,9 +230,4 @@ std::unique_ptr<nano::container_info_component> nano::local_block_broadcaster::c
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "local", local_blocks.size (), sizeof (decltype (local_blocks)::value_type) }));
return composite;
}

nano::block_hash nano::local_block_broadcaster::local_entry::hash () const
{
return block->hash ();
}
}
Loading
Loading