From eca1ec8aebede8866a601d59ade4f435d84684bf Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Fri, 17 Jan 2020 22:43:25 +0000 Subject: [PATCH] Support multiple work peers in the same host (#2477) * Correct check for peers when creating work * Support multiple work peers in the same address * Send cancels despite errors to account for non-conforming implementations * Add tests using a fake work peer, acting as good, malicious or slow * Comment --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/distributed_work.cpp | 120 ++++++++++++ nano/core_test/fakes/work_peer.hpp | 252 +++++++++++++++++++++++++ nano/node/distributed_work.cpp | 89 +++++---- nano/node/distributed_work.hpp | 20 +- nano/node/distributed_work_factory.cpp | 2 +- 6 files changed, 425 insertions(+), 59 deletions(-) create mode 100644 nano/core_test/fakes/work_peer.hpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index e6a12799d1..52bd342f3b 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -1,6 +1,7 @@ add_executable (core_test core_test_main.cc testutil.hpp + fakes/work_peer.hpp active_transactions.cpp block.cpp block_store.cpp diff --git a/nano/core_test/distributed_work.cpp b/nano/core_test/distributed_work.cpp index b1a3c7c6e8..1089220f65 100644 --- a/nano/core_test/distributed_work.cpp +++ b/nano/core_test/distributed_work.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -148,3 +149,122 @@ TEST (distributed_work, no_peers_multi) } count = 0; } + +TEST (distributed_work, peer) +{ + nano::system system; + nano::node_config node_config; + node_config.peering_port = nano::get_available_port (); + // Disable local work generation + node_config.work_threads = 0; + auto node (system.add_node (node_config)); + ASSERT_FALSE (node->local_work_generation_enabled ()); + nano::block_hash hash{ 1 }; + boost::optional work; + std::atomic done{ false }; + auto callback = [&work, &done](boost::optional work_a) { + ASSERT_TRUE (work_a.is_initialized ()); + work = work_a; + done = true; + }; + auto work_peer (std::make_shared (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::good)); + work_peer->start (); + decltype (node->config.work_peers) peers; + peers.emplace_back ("localhost", work_peer->port ()); + ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ())); + system.deadline_set (5s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_FALSE (nano::work_validate (hash, *work)); + ASSERT_EQ (1, work_peer->generations_good); + ASSERT_EQ (0, work_peer->generations_bad); + ASSERT_NO_ERROR (system.poll ()); + ASSERT_EQ (0, work_peer->cancels); +} + +TEST (distributed_work, peer_malicious) +{ + nano::system system (1); + auto node (system.nodes[0]); + ASSERT_TRUE (node->local_work_generation_enabled ()); + nano::block_hash hash{ 1 }; + boost::optional work; + std::atomic done{ false }; + auto callback = [&work, &done](boost::optional work_a) { + ASSERT_TRUE (work_a.is_initialized ()); + work = work_a; + done = true; + }; + auto malicious_peer (std::make_shared (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::malicious)); + malicious_peer->start (); + decltype (node->config.work_peers) peers; + peers.emplace_back ("localhost", malicious_peer->port ()); + ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ())); + system.deadline_set (5s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_FALSE (nano::work_validate (hash, *work)); + system.deadline_set (3s); + while (malicious_peer->generations_bad < 2) + { + ASSERT_NO_ERROR (system.poll ()); + } + // make sure it was *not* the malicious peer that replied + ASSERT_EQ (0, malicious_peer->generations_good); + // initial generation + the second time when it also starts doing local generation + ASSERT_EQ (2, malicious_peer->generations_bad); + // this peer should not receive a cancel + ASSERT_EQ (0, malicious_peer->cancels); +} + +TEST (distributed_work, peer_multi) +{ + nano::system system (1); + auto node (system.nodes[0]); + ASSERT_TRUE (node->local_work_generation_enabled ()); + nano::block_hash hash{ 1 }; + boost::optional work; + std::atomic done{ false }; + auto callback = [&work, &done](boost::optional work_a) { + ASSERT_TRUE (work_a.is_initialized ()); + work = work_a; + done = true; + }; + auto good_peer (std::make_shared (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::good)); + auto malicious_peer (std::make_shared (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::malicious)); + auto slow_peer (std::make_shared (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::slow)); + good_peer->start (); + malicious_peer->start (); + slow_peer->start (); + decltype (node->config.work_peers) peers; + peers.emplace_back ("localhost", malicious_peer->port ()); + peers.emplace_back ("localhost", slow_peer->port ()); + peers.emplace_back ("localhost", good_peer->port ()); + ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ())); + system.deadline_set (5s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_FALSE (nano::work_validate (hash, *work)); + system.deadline_set (3s); + while (slow_peer->cancels < 1) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (0, malicious_peer->generations_good); + ASSERT_EQ (1, malicious_peer->generations_bad); + ASSERT_EQ (0, malicious_peer->cancels); + + ASSERT_EQ (0, slow_peer->generations_good); + ASSERT_EQ (0, slow_peer->generations_bad); + ASSERT_EQ (1, slow_peer->cancels); + + ASSERT_EQ (1, good_peer->generations_good); + ASSERT_EQ (0, good_peer->generations_bad); + ASSERT_EQ (0, good_peer->cancels); +} diff --git a/nano/core_test/fakes/work_peer.hpp b/nano/core_test/fakes/work_peer.hpp new file mode 100644 index 0000000000..e67a48d734 --- /dev/null +++ b/nano/core_test/fakes/work_peer.hpp @@ -0,0 +1,252 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace beast = boost::beast; +namespace http = beast::http; +namespace ptree = boost::property_tree; +namespace asio = boost::asio; +using tcp = boost::asio::ip::tcp; + +namespace +{ +enum class work_peer_type +{ + good, + malicious, + slow +}; + +class work_peer_connection : public std::enable_shared_from_this +{ + const std::string generic_error = "Unable to parse JSON"; + const std::string empty_response = "Empty response"; + +public: + work_peer_connection (asio::io_context & ioc_a, work_peer_type const type_a, nano::work_pool & pool_a, std::function on_generation_a, std::function on_cancel_a) : + socket (ioc_a), + type (type_a), + work_pool (pool_a), + on_generation (on_generation_a), + on_cancel (on_cancel_a), + timer (ioc_a) + { + } + void start () + { + read_request (); + } + tcp::socket socket; + +private: + work_peer_type type; + nano::work_pool & work_pool; + beast::flat_buffer buffer{ 8192 }; + http::request request; + http::response response; + std::function on_generation; + std::function on_cancel; + asio::deadline_timer timer; + + void read_request () + { + auto this_l = shared_from_this (); + http::async_read (socket, buffer, request, [this_l](beast::error_code ec, std::size_t const /*size_a*/) { + if (!ec) + { + this_l->process_request (); + } + }); + } + + void process_request () + { + switch (request.method ()) + { + case http::verb::post: + response.result (http::status::ok); + create_response (); + break; + + default: + response.result (http::status::bad_request); + break; + } + } + + void create_response () + { + std::stringstream istream (request.body ()); + try + { + ptree::ptree result; + ptree::read_json (istream, result); + handle (result); + } + catch (...) + { + error (generic_error); + write_response (); + } + response.version (request.version ()); + response.keep_alive (false); + } + + void write_response () + { + auto this_l = shared_from_this (); + response.set (http::field::content_length, response.body ().size ()); + http::async_write (socket, response, [this_l](beast::error_code ec, std::size_t /*size_a*/) { + this_l->socket.shutdown (tcp::socket::shutdown_send, ec); + this_l->socket.close (); + }); + } + + void error (std::string const & message_a) + { + ptree::ptree error_l; + error_l.put ("error", message_a); + std::stringstream ostream; + ptree::write_json (ostream, error_l); + beast::ostream (response.body ()) << ostream.str (); + } + + void handle_generate (nano::block_hash const & hash_a) + { + if (type == work_peer_type::good) + { + auto hash = hash_a; + auto this_l (shared_from_this ()); + work_pool.generate (hash, [this_l, hash](boost::optional work_a) { + auto result = work_a.value_or (0); + uint64_t difficulty; + nano::work_validate (hash, result, &difficulty); + static nano::network_params params; + ptree::ptree message_l; + message_l.put ("work", nano::to_string_hex (result)); + message_l.put ("difficulty", nano::to_string_hex (difficulty)); + message_l.put ("multiplier", nano::to_string (nano::difficulty::to_multiplier (difficulty, params.network.publish_threshold))); + message_l.put ("hash", hash.to_string ()); + std::stringstream ostream; + ptree::write_json (ostream, message_l); + beast::ostream (this_l->response.body ()) << ostream.str (); + // Delay response by 500ms as a slow peer, immediate async call for a good peer + this_l->timer.expires_from_now (boost::posix_time::milliseconds (this_l->type == work_peer_type::slow ? 500 : 0)); + this_l->timer.async_wait ([this_l, result](const boost::system::error_code & ec) { + if (this_l->on_generation) + { + this_l->on_generation (result != 0); + } + this_l->write_response (); + }); + }); + } + else if (type == work_peer_type::malicious) + { + // Respond immediately with no work + on_generation (false); + write_response (); + } + } + + void handle (ptree::ptree const & tree_a) + { + auto action_text (tree_a.get ("action")); + auto hash_text (tree_a.get ("hash")); + nano::block_hash hash; + hash.decode_hex (hash_text); + if (action_text == "work_generate") + { + handle_generate (hash); + } + else if (action_text == "work_cancel") + { + error (empty_response); + on_cancel (); + write_response (); + } + else + { + throw; + } + } +}; + +class fake_work_peer : public std::enable_shared_from_this +{ +public: + fake_work_peer (nano::work_pool & pool_a, asio::io_context & ioc_a, unsigned short port_a, work_peer_type const type_a) : + pool (pool_a), + endpoint (tcp::v4 (), port_a), + ioc (ioc_a), + acceptor (ioc_a, endpoint), + type (type_a) + { + } + void start () + { + listen (); + } + unsigned short port () const + { + return endpoint.port (); + } + std::atomic generations_good{ 0 }; + std::atomic generations_bad{ 0 }; + std::atomic cancels{ 0 }; + +private: + void listen () + { + std::weak_ptr this_w (shared_from_this ()); + auto connection (std::make_shared (ioc, type, pool, + [this_w](bool const good_generation) { + if (auto this_l = this_w.lock ()) + { + if (good_generation) + { + ++this_l->generations_good; + } + else + { + ++this_l->generations_bad; + } + }; + }, + [this_w]() { + if (auto this_l = this_w.lock ()) + { + ++this_l->cancels; + } + })); + acceptor.async_accept (connection->socket, [connection, this_w](beast::error_code ec) { + if (!ec) + { + if (auto this_l = this_w.lock ()) + { + connection->start (); + this_l->listen (); + } + } + }); + } + nano::work_pool & pool; + tcp::endpoint endpoint; + asio::io_context & ioc; + tcp::acceptor acceptor; + work_peer_type const type; +}; +} diff --git a/nano/node/distributed_work.cpp b/nano/node/distributed_work.cpp index 0eb55d6fef..46d8a6b5a2 100644 --- a/nano/node/distributed_work.cpp +++ b/nano/node/distributed_work.cpp @@ -11,7 +11,7 @@ std::shared_ptr nano::distributed_work::peer_request::get_prepared auto request (std::make_shared ()); request->method (boost::beast::http::verb::post); request->set (boost::beast::http::field::content_type, "application/json"); - auto address_string = boost::algorithm::erase_first_copy (address.to_string (), "::ffff:"); + auto address_string = boost::algorithm::erase_first_copy (endpoint.address ().to_string (), "::ffff:"); request->set (boost::beast::http::field::host, address_string); request->target ("/"); request->version (11); @@ -69,7 +69,7 @@ void nano::distributed_work::start () auto parsed_address (boost::asio::ip::make_address_v6 (current.first, ec)); if (!ec) { - outstanding[parsed_address] = current.second; + outstanding.emplace_back (parsed_address, current.second); start (); } else @@ -80,7 +80,7 @@ void nano::distributed_work::start () for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator{}); i != n; ++i) { auto endpoint (i->endpoint ()); - this_l->outstanding[endpoint.address ()] = endpoint.port (); + this_l->outstanding.emplace_back (endpoint.address (), endpoint.port ()); } } else @@ -100,13 +100,11 @@ void nano::distributed_work::start_work () if (!outstanding.empty ()) { nano::lock_guard guard (mutex); - for (auto const & i : outstanding) + for (auto const & endpoint : outstanding) { - auto host (i.first); - auto service (i.second); - auto connection (std::make_shared (this_l->node.io_ctx, host, service)); + auto connection (std::make_shared (this_l->node.io_ctx, endpoint)); connections.emplace_back (connection); - connection->socket.async_connect (nano::tcp_endpoint (host, service), + connection->socket.async_connect (connection->endpoint, boost::asio::bind_executor (strand, [this_l, connection](boost::system::error_code const & ec) { if (!ec && !this_l->stopped) @@ -137,42 +135,35 @@ void nano::distributed_work::start_work () { if (connection->response.result () == boost::beast::http::status::ok) { - this_l->success (connection->response.body (), connection->address, connection->port); + this_l->success (connection->response.body (), connection->endpoint); } else if (ec) { - this_l->node.logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ())); - this_l->add_bad_peer (connection->address, connection->port); - this_l->failure (connection->address); + this_l->node.logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->endpoint.address () % connection->endpoint.port () % connection->response.result ())); + this_l->add_bad_peer (connection->endpoint); + this_l->failure (connection->endpoint); } } - else if (ec == boost::system::errc::operation_canceled) - { - // The only case where we send a cancel is if we preempt stopped waiting for the response - this_l->cancel (*connection); - this_l->failure (connection->address); - } else if (ec) { - this_l->node.logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - this_l->add_bad_peer (connection->address, connection->port); - this_l->failure (connection->address); + this_l->cancel (*connection); + this_l->failure (connection->endpoint); } })); } else if (ec && ec != boost::system::errc::operation_canceled) { - this_l->node.logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - this_l->add_bad_peer (connection->address, connection->port); - this_l->failure (connection->address); + this_l->node.logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->endpoint.address () % connection->endpoint.port () % ec.message () % ec.value ())); + this_l->add_bad_peer (connection->endpoint); + this_l->failure (connection->endpoint); } })); } else if (ec && ec != boost::system::errc::operation_canceled) { - this_l->node.logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - this_l->add_bad_peer (connection->address, connection->port); - this_l->failure (connection->address); + this_l->node.logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->endpoint.address () % connection->endpoint.port () % ec.message () % ec.value ())); + this_l->add_bad_peer (connection->endpoint); + this_l->failure (connection->endpoint); } })); } @@ -209,8 +200,8 @@ void nano::distributed_work::start_work () void nano::distributed_work::cancel (peer_request const & connection_a) { auto this_l (shared_from_this ()); - auto cancelling_l (std::make_shared (node.io_ctx, connection_a.address, connection_a.port)); - cancelling_l->socket.async_connect (nano::tcp_endpoint (cancelling_l->address, cancelling_l->port), + auto cancelling_l (std::make_shared (node.io_ctx, connection_a.endpoint)); + cancelling_l->socket.async_connect (cancelling_l->endpoint, boost::asio::bind_executor (strand, [this_l, cancelling_l](boost::system::error_code const & ec) { if (!ec) @@ -230,16 +221,16 @@ void nano::distributed_work::cancel (peer_request const & connection_a) [this_l, peer_cancel, cancelling_l](boost::system::error_code const & ec, size_t bytes_transferred) { if (ec && ec != boost::system::errc::operation_canceled) { - this_l->node.logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling_l->address % cancelling_l->port % ec.message () % ec.value ())); + this_l->node.logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling_l->endpoint.address () % cancelling_l->endpoint.port () % ec.message () % ec.value ())); } })); } })); } -void nano::distributed_work::success (std::string const & body_a, boost::asio::ip::address const & address_a, uint16_t port_a) +void nano::distributed_work::success (std::string const & body_a, nano::tcp_endpoint const & endpoint_a) { - auto last (remove (address_a)); + auto last (remove (endpoint_a)); std::stringstream istream (body_a); try { @@ -253,27 +244,27 @@ void nano::distributed_work::success (std::string const & body_a, boost::asio::i if (!nano::work_validate (request.root, work, &result_difficulty) && result_difficulty >= request.difficulty) { node.unresponsive_work_peers = false; - set_once (work, boost::str (boost::format ("%1%:%2%") % address_a % port_a)); + set_once (work, boost::str (boost::format ("%1%:%2%") % endpoint_a.address () % endpoint_a.port ())); stop_once (true); } else { - node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1%:%2% for root %3% with diffuculty %4%: %5%") % address_a % port_a % request.root.to_string () % nano::to_string_hex (request.difficulty) % work_text)); - add_bad_peer (address_a, port_a); + node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1%:%2% for root %3% with diffuculty %4%: %5%") % endpoint_a.address () % endpoint_a.port () % request.root.to_string () % nano::to_string_hex (request.difficulty) % work_text)); + add_bad_peer (endpoint_a); handle_failure (last); } } else { - node.logger.try_log (boost::str (boost::format ("Work response from %1%:%2% wasn't a number: %3%") % address_a % port_a % work_text)); - add_bad_peer (address_a, port_a); + node.logger.try_log (boost::str (boost::format ("Work response from %1%:%2% wasn't a number: %3%") % endpoint_a.address () % endpoint_a.port () % work_text)); + add_bad_peer (endpoint_a); handle_failure (last); } } catch (...) { - node.logger.try_log (boost::str (boost::format ("Work response from %1%:%2% wasn't parsable: %3%") % address_a % port_a % body_a)); - add_bad_peer (address_a, port_a); + node.logger.try_log (boost::str (boost::format ("Work response from %1%:%2% wasn't parsable: %3%") % endpoint_a.address () % endpoint_a.port () % body_a)); + add_bad_peer (endpoint_a); handle_failure (last); } } @@ -302,12 +293,12 @@ void nano::distributed_work::stop_once (bool const local_stop_a) connection_l->socket.close (ec); if (ec) { - this_l->node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ())); + this_l->node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->endpoint.address () % connection_l->endpoint.port () % ec.message () % ec.value ())); } } else { - this_l->node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ())); + this_l->node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->endpoint.address () % connection_l->endpoint.port () % ec.message () % ec.value ())); } } })); @@ -357,9 +348,9 @@ void nano::distributed_work::cancel () } } -void nano::distributed_work::failure (boost::asio::ip::address const & address_a) +void nano::distributed_work::failure (nano::tcp_endpoint const & endpoint_a) { - auto last (remove (address_a)); + auto last (remove (endpoint_a)); handle_failure (last); } @@ -399,15 +390,19 @@ void nano::distributed_work::handle_failure (bool const last_a) } } -bool nano::distributed_work::remove (boost::asio::ip::address const & address_a) +bool nano::distributed_work::remove (nano::tcp_endpoint const & endpoint_a) { nano::lock_guard guard (mutex); - outstanding.erase (address_a); + auto existing (std::find (outstanding.begin (), outstanding.end (), endpoint_a)); + if (existing != outstanding.end ()) + { + outstanding.erase (existing); + } return outstanding.empty (); } -void nano::distributed_work::add_bad_peer (boost::asio::ip::address const & address_a, uint16_t port_a) +void nano::distributed_work::add_bad_peer (nano::tcp_endpoint const & endpoint_a) { nano::lock_guard guard (mutex); - bad_peers.emplace_back (boost::str (boost::format ("%1%:%2%") % address_a % port_a)); + bad_peers.emplace_back (boost::str (boost::format ("%1%:%2%") % endpoint_a.address () % endpoint_a.port ())); } diff --git a/nano/node/distributed_work.hpp b/nano/node/distributed_work.hpp index a015174076..3e536cb8c3 100644 --- a/nano/node/distributed_work.hpp +++ b/nano/node/distributed_work.hpp @@ -6,10 +6,10 @@ #include #include #include +#include #include -#include #include using request_type = boost::beast::http::request; @@ -52,15 +52,13 @@ class distributed_work final : public std::enable_shared_from_this get_prepared_json_request (std::string const &) const; - boost::asio::ip::address address; - uint16_t port; + nano::tcp_endpoint const endpoint; boost::beast::flat_buffer buffer; boost::beast::http::response response; boost::asio::ip::tcp::socket socket; @@ -77,14 +75,14 @@ class distributed_work final : public std::enable_shared_from_this strand; std::vector> need_resolve; - std::map outstanding; + std::vector outstanding; std::vector> connections; work_generation_status status{ work_generation_status::ongoing }; diff --git a/nano/node/distributed_work_factory.cpp b/nano/node/distributed_work_factory.cpp index 4dd5d43fef..8e505ee7a6 100644 --- a/nano/node/distributed_work_factory.cpp +++ b/nano/node/distributed_work_factory.cpp @@ -22,7 +22,7 @@ bool nano::distributed_work_factory::make (std::chrono::seconds const & backoff_ if (!stopped) { cleanup_finished (); - if (node.work_generation_enabled ()) + if (node.work_generation_enabled (request_a.peers)) { auto distributed (std::make_shared (node, request_a, backoff_a)); {