Skip to content

Commit

Permalink
Support multiple work peers in the same host (nanocurrency#2477)
Browse files Browse the repository at this point in the history
* Correct check for peers when creating work

* Support multiple work peers in the same address

* Send cancels despite errors to account for non-conforming implementations

* Add tests using a fake work peer, acting as good, malicious or slow

* Comment
  • Loading branch information
guilhermelawless authored and wezrule committed Jan 22, 2020
1 parent f57b350 commit 815c24e
Show file tree
Hide file tree
Showing 6 changed files with 425 additions and 59 deletions.
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_executable (core_test
core_test_main.cc
testutil.hpp
fakes/work_peer.hpp
active_transactions.cpp
block.cpp
block_store.cpp
Expand Down
120 changes: 120 additions & 0 deletions nano/core_test/distributed_work.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <nano/core_test/fakes/work_peer.hpp>
#include <nano/core_test/testutil.hpp>
#include <nano/node/testing.hpp>

Expand Down Expand Up @@ -148,3 +149,122 @@ TEST (distributed_work, no_peers_multi)
}
count = 0;
}

TEST (distributed_work, peer)
{
nano::system system;
nano::node_config node_config;
node_config.peering_port = nano::get_available_port ();
// Disable local work generation
node_config.work_threads = 0;
auto node (system.add_node (node_config));
ASSERT_FALSE (node->local_work_generation_enabled ());
nano::block_hash hash{ 1 };
boost::optional<uint64_t> work;
std::atomic<bool> done{ false };
auto callback = [&work, &done](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = work_a;
done = true;
};
auto work_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::good));
work_peer->start ();
decltype (node->config.work_peers) peers;
peers.emplace_back ("localhost", work_peer->port ());
ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ()));
system.deadline_set (5s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_FALSE (nano::work_validate (hash, *work));
ASSERT_EQ (1, work_peer->generations_good);
ASSERT_EQ (0, work_peer->generations_bad);
ASSERT_NO_ERROR (system.poll ());
ASSERT_EQ (0, work_peer->cancels);
}

TEST (distributed_work, peer_malicious)
{
nano::system system (1);
auto node (system.nodes[0]);
ASSERT_TRUE (node->local_work_generation_enabled ());
nano::block_hash hash{ 1 };
boost::optional<uint64_t> work;
std::atomic<bool> done{ false };
auto callback = [&work, &done](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = work_a;
done = true;
};
auto malicious_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::malicious));
malicious_peer->start ();
decltype (node->config.work_peers) peers;
peers.emplace_back ("localhost", malicious_peer->port ());
ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ()));
system.deadline_set (5s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_FALSE (nano::work_validate (hash, *work));
system.deadline_set (3s);
while (malicious_peer->generations_bad < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
// make sure it was *not* the malicious peer that replied
ASSERT_EQ (0, malicious_peer->generations_good);
// initial generation + the second time when it also starts doing local generation
ASSERT_EQ (2, malicious_peer->generations_bad);
// this peer should not receive a cancel
ASSERT_EQ (0, malicious_peer->cancels);
}

TEST (distributed_work, peer_multi)
{
nano::system system (1);
auto node (system.nodes[0]);
ASSERT_TRUE (node->local_work_generation_enabled ());
nano::block_hash hash{ 1 };
boost::optional<uint64_t> work;
std::atomic<bool> done{ false };
auto callback = [&work, &done](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = work_a;
done = true;
};
auto good_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::good));
auto malicious_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::malicious));
auto slow_peer (std::make_shared<fake_work_peer> (node->work, node->io_ctx, nano::get_available_port (), work_peer_type::slow));
good_peer->start ();
malicious_peer->start ();
slow_peer->start ();
decltype (node->config.work_peers) peers;
peers.emplace_back ("localhost", malicious_peer->port ());
peers.emplace_back ("localhost", slow_peer->port ());
peers.emplace_back ("localhost", good_peer->port ());
ASSERT_FALSE (node->distributed_work.make (hash, peers, callback, node->network_params.network.publish_threshold, nano::account ()));
system.deadline_set (5s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_FALSE (nano::work_validate (hash, *work));
system.deadline_set (3s);
while (slow_peer->cancels < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (0, malicious_peer->generations_good);
ASSERT_EQ (1, malicious_peer->generations_bad);
ASSERT_EQ (0, malicious_peer->cancels);

ASSERT_EQ (0, slow_peer->generations_good);
ASSERT_EQ (0, slow_peer->generations_bad);
ASSERT_EQ (1, slow_peer->cancels);

ASSERT_EQ (1, good_peer->generations_good);
ASSERT_EQ (0, good_peer->generations_bad);
ASSERT_EQ (0, good_peer->cancels);
}
252 changes: 252 additions & 0 deletions nano/core_test/fakes/work_peer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
#pragma once

#include <nano/lib/errors.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/common.hpp>

#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

#include <unordered_set>

namespace beast = boost::beast;
namespace http = beast::http;
namespace ptree = boost::property_tree;
namespace asio = boost::asio;
using tcp = boost::asio::ip::tcp;

namespace
{
enum class work_peer_type
{
good,
malicious,
slow
};

class work_peer_connection : public std::enable_shared_from_this<work_peer_connection>
{
const std::string generic_error = "Unable to parse JSON";
const std::string empty_response = "Empty response";

public:
work_peer_connection (asio::io_context & ioc_a, work_peer_type const type_a, nano::work_pool & pool_a, std::function<void(bool const)> on_generation_a, std::function<void()> on_cancel_a) :
socket (ioc_a),
type (type_a),
work_pool (pool_a),
on_generation (on_generation_a),
on_cancel (on_cancel_a),
timer (ioc_a)
{
}
void start ()
{
read_request ();
}
tcp::socket socket;

private:
work_peer_type type;
nano::work_pool & work_pool;
beast::flat_buffer buffer{ 8192 };
http::request<http::string_body> request;
http::response<http::dynamic_body> response;
std::function<void(bool const)> on_generation;
std::function<void()> on_cancel;
asio::deadline_timer timer;

void read_request ()
{
auto this_l = shared_from_this ();
http::async_read (socket, buffer, request, [this_l](beast::error_code ec, std::size_t const /*size_a*/) {
if (!ec)
{
this_l->process_request ();
}
});
}

void process_request ()
{
switch (request.method ())
{
case http::verb::post:
response.result (http::status::ok);
create_response ();
break;

default:
response.result (http::status::bad_request);
break;
}
}

void create_response ()
{
std::stringstream istream (request.body ());
try
{
ptree::ptree result;
ptree::read_json (istream, result);
handle (result);
}
catch (...)
{
error (generic_error);
write_response ();
}
response.version (request.version ());
response.keep_alive (false);
}

void write_response ()
{
auto this_l = shared_from_this ();
response.set (http::field::content_length, response.body ().size ());
http::async_write (socket, response, [this_l](beast::error_code ec, std::size_t /*size_a*/) {
this_l->socket.shutdown (tcp::socket::shutdown_send, ec);
this_l->socket.close ();
});
}

void error (std::string const & message_a)
{
ptree::ptree error_l;
error_l.put ("error", message_a);
std::stringstream ostream;
ptree::write_json (ostream, error_l);
beast::ostream (response.body ()) << ostream.str ();
}

void handle_generate (nano::block_hash const & hash_a)
{
if (type == work_peer_type::good)
{
auto hash = hash_a;
auto this_l (shared_from_this ());
work_pool.generate (hash, [this_l, hash](boost::optional<uint64_t> work_a) {
auto result = work_a.value_or (0);
uint64_t difficulty;
nano::work_validate (hash, result, &difficulty);
static nano::network_params params;
ptree::ptree message_l;
message_l.put ("work", nano::to_string_hex (result));
message_l.put ("difficulty", nano::to_string_hex (difficulty));
message_l.put ("multiplier", nano::to_string (nano::difficulty::to_multiplier (difficulty, params.network.publish_threshold)));
message_l.put ("hash", hash.to_string ());
std::stringstream ostream;
ptree::write_json (ostream, message_l);
beast::ostream (this_l->response.body ()) << ostream.str ();
// Delay response by 500ms as a slow peer, immediate async call for a good peer
this_l->timer.expires_from_now (boost::posix_time::milliseconds (this_l->type == work_peer_type::slow ? 500 : 0));
this_l->timer.async_wait ([this_l, result](const boost::system::error_code & ec) {
if (this_l->on_generation)
{
this_l->on_generation (result != 0);
}
this_l->write_response ();
});
});
}
else if (type == work_peer_type::malicious)
{
// Respond immediately with no work
on_generation (false);
write_response ();
}
}

void handle (ptree::ptree const & tree_a)
{
auto action_text (tree_a.get<std::string> ("action"));
auto hash_text (tree_a.get<std::string> ("hash"));
nano::block_hash hash;
hash.decode_hex (hash_text);
if (action_text == "work_generate")
{
handle_generate (hash);
}
else if (action_text == "work_cancel")
{
error (empty_response);
on_cancel ();
write_response ();
}
else
{
throw;
}
}
};

class fake_work_peer : public std::enable_shared_from_this<fake_work_peer>
{
public:
fake_work_peer (nano::work_pool & pool_a, asio::io_context & ioc_a, unsigned short port_a, work_peer_type const type_a) :
pool (pool_a),
endpoint (tcp::v4 (), port_a),
ioc (ioc_a),
acceptor (ioc_a, endpoint),
type (type_a)
{
}
void start ()
{
listen ();
}
unsigned short port () const
{
return endpoint.port ();
}
std::atomic<size_t> generations_good{ 0 };
std::atomic<size_t> generations_bad{ 0 };
std::atomic<size_t> cancels{ 0 };

private:
void listen ()
{
std::weak_ptr<fake_work_peer> this_w (shared_from_this ());
auto connection (std::make_shared<work_peer_connection> (ioc, type, pool,
[this_w](bool const good_generation) {
if (auto this_l = this_w.lock ())
{
if (good_generation)
{
++this_l->generations_good;
}
else
{
++this_l->generations_bad;
}
};
},
[this_w]() {
if (auto this_l = this_w.lock ())
{
++this_l->cancels;
}
}));
acceptor.async_accept (connection->socket, [connection, this_w](beast::error_code ec) {
if (!ec)
{
if (auto this_l = this_w.lock ())
{
connection->start ();
this_l->listen ();
}
}
});
}
nano::work_pool & pool;
tcp::endpoint endpoint;
asio::io_context & ioc;
tcp::acceptor acceptor;
work_peer_type const type;
};
}
Loading

0 comments on commit 815c24e

Please sign in to comment.