Skip to content

Commit

Permalink
Ensure propagation and removal for the work watcher (#2709)
Browse files Browse the repository at this point in the history
* Ensure propagation and removal for the work watcher

- Allow removing a block by its root, even if the hash does not match
- Simplify (2 less indent levels) `work_watcher::watching`
- Test suite ensuring removal and propagation in different conditions

* Remove duplicate test

* Simplify work_watcher.generation_disabled test and reduce test time

* Check still watched after work generation, before flooding (Serg comment)
  • Loading branch information
guilhermelawless authored Apr 10, 2020
1 parent f88da6d commit eb4e7fa
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 85 deletions.
149 changes: 112 additions & 37 deletions nano/core_test/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ TEST (wallet, deterministic_restore)
ASSERT_TRUE (wallet->exists (pub));
}

TEST (wallet, work_watcher_update)
TEST (work_watcher, update)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
Expand Down Expand Up @@ -1206,70 +1206,145 @@ TEST (wallet, work_watcher_update)
ASSERT_GT (updated_multiplier2, multiplier2);
}

TEST (wallet, work_watcher_generation_disabled)
TEST (work_watcher, propagate)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.enable_voting = false;
node_config.work_watcher_period = 1s;
node_config.work_threads = 0;
auto & node = *system.add_node (node_config);
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
nano::genesis genesis;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
auto & node = *system.add_node (node_config, node_flags);
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
node_config.peering_port = nano::get_available_port ();
auto & node_passive = *system.add_node (node_config);
nano::keypair key;
auto block (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *pool.generate (genesis.hash ())));
auto difficulty (block->difficulty ());
node.wallets.watcher->add (block);
ASSERT_FALSE (node.process_local (block).code != nano::process_result::progress);
ASSERT_TRUE (node.wallets.watcher->is_watched (block->qualified_root ()));
auto multiplier = nano::normalized_multiplier (nano::difficulty::to_multiplier (difficulty, nano::work_threshold (block->work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1);
double updated_multiplier{ multiplier };
auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
system.deadline_set (5s);
while (!node_passive.ledger.block_exists (block->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
}
auto const multiplier (nano::normalized_multiplier (nano::difficulty::to_multiplier (block->difficulty (), nano::work_threshold (block->work_version (), nano::block_details (nano::epoch::epoch_0, false, false, false))), node.network_params.network.publish_thresholds.epoch_1));
auto updated_multiplier{ multiplier };
auto propagated_multiplier{ multiplier };
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
node.active.trended_active_multiplier = multiplier * 1.001;
}
bool updated{ false };
bool propagated{ false };
system.deadline_set (10s);
while (!(updated && propagated))
{
nano::unique_lock<std::mutex> lock (node.active.mutex);
// Prevent active difficulty repopulating multipliers
node.network_params.network.request_interval_ms = 10000;
//fill multipliers_cb and update active difficulty;
for (auto i (0); i < node.active.multipliers_cb.size (); i++)
{
node.active.multipliers_cb.push_back (multiplier * (1.5 + i / 100.));
nano::lock_guard<std::mutex> guard (node.active.mutex);
{
auto const existing (node.active.roots.find (block->qualified_root ()));
ASSERT_NE (existing, node.active.roots.end ());
updated_multiplier = existing->multiplier;
}
}
node.active.update_active_multiplier (lock);
{
nano::lock_guard<std::mutex> guard (node_passive.active.mutex);
{
auto const existing (node_passive.active.roots.find (block->qualified_root ()));
ASSERT_NE (existing, node_passive.active.roots.end ());
propagated_multiplier = existing->multiplier;
}
}
updated = updated_multiplier != multiplier;
propagated = propagated_multiplier != multiplier;
ASSERT_NO_ERROR (system.poll ());
}
std::this_thread::sleep_for (5s);
ASSERT_GT (updated_multiplier, multiplier);
ASSERT_EQ (propagated_multiplier, updated_multiplier);
}

nano::lock_guard<std::mutex> guard (node.active.mutex);
TEST (work_watcher, removed_after_win)
{
nano::system system (1);
auto & node (*system.nodes[0]);
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
ASSERT_EQ (0, wallet.wallets.watcher->size ());
auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
ASSERT_EQ (1, wallet.wallets.watcher->size ());
system.deadline_set (5s);
while (node.wallets.watcher->is_watched (block1->qualified_root ()))
{
auto const existing (node.active.roots.find (block->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_multiplier = existing->multiplier;
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (updated_multiplier, multiplier);
ASSERT_TRUE (node.distributed_work.items.empty ());
ASSERT_EQ (0, node.wallets.watcher->size ());
}

TEST (wallet, work_watcher_removed)
TEST (work_watcher, removed_after_lose)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.enable_voting = false;
node_config.work_watcher_period = 1s;
auto & node = *system.add_node (node_config);
(void)node;
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
ASSERT_EQ (0, wallet.wallets.watcher->size ());
auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
ASSERT_EQ (1, wallet.wallets.watcher->size ());
auto transaction (wallet.wallets.tx_begin_write ());
system.deadline_set (3s);
while (0 == wallet.wallets.watcher->size ())
auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
ASSERT_TRUE (node.wallets.watcher->is_watched (block1->qualified_root ()));
nano::genesis genesis;
auto fork1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::xrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
node.process_active (fork1);
node.block_processor.flush ();
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, fork1));
nano::confirm_ack message (vote);
node.network.process_message (message, nullptr);
system.deadline_set (5s);
while (node.wallets.watcher->is_watched (block1->qualified_root ()))
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (0, node.wallets.watcher->size ());
}

TEST (work_watcher, generation_disabled)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.enable_voting = false;
node_config.work_watcher_period = 1s;
node_config.work_threads = 0;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
auto & node = *system.add_node (node_config);
ASSERT_FALSE (node.work_generation_enabled ());
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
nano::genesis genesis;
nano::keypair key;
auto block (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *pool.generate (genesis.hash ())));
auto difficulty (block->difficulty ());
node.wallets.watcher->add (block);
ASSERT_FALSE (node.process_local (block).code != nano::process_result::progress);
ASSERT_TRUE (node.wallets.watcher->is_watched (block->qualified_root ()));
auto multiplier = nano::normalized_multiplier (nano::difficulty::to_multiplier (difficulty, nano::work_threshold (block->work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1);
double updated_multiplier{ multiplier };
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
node.active.trended_active_multiplier = multiplier * 10;
}
std::this_thread::sleep_for (2s);
ASSERT_TRUE (node.wallets.watcher->is_watched (block->qualified_root ()));
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
auto const existing (node.active.roots.find (block->qualified_root ()));
ASSERT_NE (existing, node.active.roots.end ());
updated_multiplier = existing->multiplier;
}
ASSERT_EQ (updated_multiplier, multiplier);
ASSERT_TRUE (node.distributed_work.items.empty ());
}

TEST (wallet, work_watcher_cancel)
TEST (work_watcher, cancel)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
for (auto & i : rollback_list)
{
node.votes_cache.remove (i->hash ());
node.wallets.watcher->remove (i);
node.wallets.watcher->remove (*i);
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/testing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void nano::system::generate_rollback (nano::node & node_a, std::vector<nano::acc
debug_assert (!error);
for (auto & i : rollback_list)
{
node_a.wallets.watcher->remove (i);
node_a.wallets.watcher->remove (*i);
node_a.active.erase (*i);
}
}
Expand Down
78 changes: 33 additions & 45 deletions nano/node/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,7 @@ node (node_a),
stopped (false)
{
node.observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
this->remove (status_a.winner);
this->remove (*status_a.winner);
});
}

Expand Down Expand Up @@ -1460,65 +1460,53 @@ void nano::work_watcher::watching (nano::qualified_root const & root_a, std::sha
std::weak_ptr<nano::work_watcher> watcher_w (shared_from_this ());
node.alarm.add (std::chrono::steady_clock::now () + node.config.work_watcher_period, [block_a, root_a, watcher_w]() {
auto watcher_l = watcher_w.lock ();
if (watcher_l && !watcher_l->stopped && block_a != nullptr)
if (watcher_l && !watcher_l->stopped && watcher_l->is_watched (root_a))
{
nano::unique_lock<std::mutex> lock (watcher_l->mutex);
if (watcher_l->watched.find (root_a) != watcher_l->watched.end ()) // not yet confirmed or cancelled
auto active_difficulty (watcher_l->node.active.limited_active_difficulty (*block_a));
/*
* Work watcher should still watch blocks even without work generation, although no rework is done
* Functionality may be added in the future that does not require updating work
*/
if (active_difficulty > block_a->difficulty () && watcher_l->node.work_generation_enabled ())
{
lock.unlock ();
auto active_difficulty (watcher_l->node.active.limited_active_difficulty (*block_a));
/*
* Work watcher should still watch blocks even without work generation, although no rework is done
* Functionality may be added in the future that does not require updating work
*/
if (active_difficulty > block_a->difficulty () && watcher_l->node.work_generation_enabled ())
{
watcher_l->node.work_generate (
block_a->work_version (), block_a->root (), active_difficulty, [watcher_l, block_a, root_a](boost::optional<uint64_t> work_a) {
if (block_a != nullptr && watcher_l != nullptr && !watcher_l->stopped)
watcher_l->node.work_generate (
block_a->work_version (), block_a->root (), active_difficulty, [watcher_l, block_a, root_a](boost::optional<uint64_t> work_a) {
if (watcher_l->is_watched (root_a))
{
if (work_a.is_initialized ())
{
bool updated_l{ false };
if (work_a.is_initialized ())
{
nano::state_block_builder builder;
std::error_code ec;
std::shared_ptr<nano::state_block> block (builder.from (*block_a).work (*work_a).build (ec));

if (!ec)
{
watcher_l->node.network.flood_block_initial (block);
watcher_l->node.active.update_difficulty (block);
watcher_l->update (root_a, block);
updated_l = true;
watcher_l->watching (root_a, block);
}
}
if (!updated_l)
debug_assert (nano::work_difficulty (block_a->work_version (), block_a->root (), *work_a) > block_a->difficulty ());
nano::state_block_builder builder;
std::error_code ec;
std::shared_ptr<nano::state_block> block (builder.from (*block_a).work (*work_a).build (ec));
if (!ec)
{
watcher_l->watching (root_a, block_a);
watcher_l->node.network.flood_block_initial (block);
watcher_l->node.active.update_difficulty (block);
watcher_l->update (root_a, block);
}
}
},
block_a->account ());
}
else
{
watcher_l->watching (root_a, block_a);
}
watcher_l->watching (root_a, block_a);
}
},
block_a->account ());
}
else
{
watcher_l->watching (root_a, block_a);
}
}
});
}

void nano::work_watcher::remove (std::shared_ptr<nano::block> block_a)
void nano::work_watcher::remove (nano::block const & block_a)
{
auto root_l (block_a->qualified_root ());
nano::lock_guard<std::mutex> lock (mutex);
auto existing (watched.find (root_l));
if (existing != watched.end () && existing->second->hash () == block_a->hash ())
auto existing (watched.find (block_a.qualified_root ()));
if (existing != watched.end ())
{
watched.erase (existing);
node.observers.work_cancel.notify (block_a->root ());
node.observers.work_cancel.notify (block_a.root ());
}
}

Expand Down
2 changes: 1 addition & 1 deletion nano/node/wallet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class work_watcher final : public std::enable_shared_from_this<nano::work_watche
void add (std::shared_ptr<nano::block>);
void update (nano::qualified_root const &, std::shared_ptr<nano::state_block>);
void watching (nano::qualified_root const &, std::shared_ptr<nano::state_block>);
void remove (std::shared_ptr<nano::block>);
void remove (nano::block const &);
bool is_watched (nano::qualified_root const &);
size_t size ();
std::mutex mutex;
Expand Down

0 comments on commit eb4e7fa

Please sign in to comment.