Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit batch size for block processor #4655

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2646,53 +2646,6 @@ TEST (node, block_processor_full)
ASSERT_TIMELY (5s, node.block_processor.full ());
}

TEST (node, block_processor_half_full)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 6;
node_flags.force_use_write_queue = true;
auto & node = *system.add_node (nano::node_config (system.get_available_port ()), node_flags);
nano::state_block_builder builder;
auto send1 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node.work_generate_blocking (nano::dev::genesis->hash ()))
.build ();
auto send2 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (send1->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node.work_generate_blocking (send1->hash ()))
.build ();
auto send3 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (send2->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 3 * nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node.work_generate_blocking (send2->hash ()))
.build ();
// The write guard prevents block processor doing any writes
auto write_guard = node.store.write_queue.wait (nano::store::writer::testing);
node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send2);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send3);
// Block processor may be not half_full during state blocks signatures verification
ASSERT_TIMELY (2s, node.block_processor.half_full ());
ASSERT_FALSE (node.block_processor.full ());
}

TEST (node, confirm_back)
{
nano::test::system system (1);
Expand Down
67 changes: 38 additions & 29 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,13 @@ void nano::block_processor::run ()
{
if (!queue.empty ())
{
lock.unlock ();
// TODO: Cleaner periodical logging
if (should_log ())
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}

auto processed = process_batch (lock);
debug_assert (!lock.owns_lock ());
Expand Down Expand Up @@ -291,40 +297,47 @@ auto nano::block_processor::next () -> context
release_assert (false, "next() called when no blocks are ready");
}

auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a) -> processed_batch_t
auto nano::block_processor::next_batch (size_t max_count) -> std::deque<context>
{
processed_batch_t processed;
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor);
nano::timer<std::chrono::milliseconds> timer_l;
queue.periodic_update ();

lock_a.lock ();
std::deque<context> results;
while (!queue.empty () && results.size () < max_count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't fair_queue already have a next_batch()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has, but there were some additional checks in the block_processor::next () so it wasn't a drop in replacement.

{
results.push_back (next ());
}
return results;
}

queue.periodic_update ();
auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock) -> processed_batch_t
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

auto batch = next_batch (256);

lock.unlock ();

timer_l.start ();
auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor);

nano::timer<std::chrono::milliseconds> timer;
timer.start ();

// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
size_t number_of_blocks_processed = 0;
size_t number_of_forced_processed = 0;

while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
processed_batch_t processed;
for (auto & ctx : batch)
{
// TODO: Cleaner periodical logging
if (should_log ())
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}

auto ctx = next ();
auto const hash = ctx.block->hash ();
bool const force = ctx.source == nano::block_source::forced;

lock_a.unlock ();
transaction.refresh_if_needed ();

if (force)
{
Expand All @@ -336,15 +349,11 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

auto result = process_one (transaction, ctx, force);
processed.emplace_back (result, std::move (ctx));

lock_a.lock ();
}

lock_a.unlock ();

if (number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100))
if (number_of_blocks_processed != 0 && timer.stop () > std::chrono::milliseconds (100))
{
node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer_l.value ().count (), timer_l.unit ());
node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ());
}

return processed;
Expand Down
7 changes: 5 additions & 2 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,15 @@ class block_processor final

std::size_t size () const;
std::size_t size (block_source) const;
bool full () const;
bool half_full () const;
bool add (std::shared_ptr<nano::block> const &, block_source = block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();

// TODO: Remove, used by legacy bootstrap
bool full () const;
bool half_full () const;

std::unique_ptr<container_info_component> collect_container_info (std::string const & name);

std::atomic<bool> flushing{ false };
Expand All @@ -120,6 +122,7 @@ class block_processor final
nano::block_status process_one (secure::write_transaction const &, context const &, bool forced = false);
void queue_unchecked (secure::write_transaction const &, nano::hash_or_account const &);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
std::deque<context> next_batch (size_t max_count);
context next ();
bool add_impl (context, std::shared_ptr<nano::transport::channel> const & channel = nullptr);

Expand Down
Loading