Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distibuted work fixes #2230

Merged
merged 17 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions nano/lib/work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void nano::work_pool::loop (uint64_t thread)
blake2b_init (&hash, sizeof (output));
std::unique_lock<std::mutex> lock (mutex);
auto pow_sleep = pow_rate_limiter;
while (!done || !pending.empty ())
while (!done)
{
auto empty (pending.empty ());
if (thread == 0)
Expand Down Expand Up @@ -139,33 +139,37 @@ void nano::work_pool::loop (uint64_t thread)
void nano::work_pool::cancel (nano::uint256_union const & root_a)
{
std::lock_guard<std::mutex> lock (mutex);
if (!pending.empty ())
if (!done)
{
if (pending.front ().item == root_a)
if (!pending.empty ())
{
++ticket;
if (pending.front ().item == root_a)
{
++ticket;
}
}
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
bool result;
if (item_a.item == root_a)
{
item_a.callback (boost::none);
result = true;
}
else
{
result = false;
}
return result;
});
}
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
bool result;
if (item_a.item == root_a)
{
item_a.callback (boost::none);
result = true;
}
else
{
result = false;
}
return result;
});
}

void nano::work_pool::stop ()
{
{
std::lock_guard<std::mutex> lock (mutex);
done = true;
++ticket;
}
producer_condition.notify_all ();
}
Expand Down
237 changes: 147 additions & 90 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ startup_time (std::chrono::steady_clock::now ())
logger.always_log ("Active network: ", network_label);

logger.always_log (boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ()));
logger.always_log (boost::str (boost::format ("%1% work peers configured") % config.work_peers.size ()));
if (config.work_peers.empty () && config.work_threads == 0 && !work.opencl)
{
logger.always_log ("Work generation is disabled");
}

if (config.logging.node_lifetime_tracing ())
{
Expand Down Expand Up @@ -681,6 +686,7 @@ void nano::node::stop ()
stats.stop ();
write_database_queue.stop ();
worker.stop ();
// work pool is not stopped on purpose due to testing setup
}
}

Expand Down Expand Up @@ -953,6 +959,10 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
{
assert (node_a != nullptr);
}
~distributed_work ()
{
stop (true);
}
distributed_work (unsigned int backoff_a, std::shared_ptr<nano::node> const & node_a, nano::block_hash const & root_a, std::function<void(uint64_t)> const & callback_a, uint64_t difficulty_a) :
callback (callback_a),
backoff (backoff_a),
Expand All @@ -962,7 +972,7 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
difficulty (difficulty_a)
{
assert (node_a != nullptr);
completed.clear ();
assert (!completed);
}
void start ()
{
Expand Down Expand Up @@ -1004,88 +1014,103 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
}
void start_work ()
{
auto this_l (shared_from_this ());

// Start work generation if peers are not acting correctly, or if there are no peers configured
if ((outstanding.empty () || node->unresponsive_work_peers) && (node->config.work_threads != 0 || node->work.opencl))
{
local_generation_started = true;
node->work.generate (this_l->root, [this_l](boost::optional<uint64_t> const & work_a) {
if (work_a)
{
this_l->set_once (work_a.value ());
this_l->stop (false);
}
},
difficulty);
}

if (!outstanding.empty ())
{
auto this_l (shared_from_this ());
std::lock_guard<std::mutex> lock (mutex);
std::lock_guard<std::mutex> guard (mutex);
for (auto const & i : outstanding)
{
auto host (i.first);
auto service (i.second);
node->background ([this_l, host, service]() {
auto connection (std::make_shared<work_request> (this_l->node->io_ctx, host, service));
connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) {
if (!ec)
auto connection (std::make_shared<work_request> (this_l->node->io_ctx, host, service));
connections.push_back (connection);
connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
std::string request_string;
boost::property_tree::ptree request;
request.put ("action", "work_generate");
request.put ("hash", this_l->root.to_string ());
request.put ("difficulty", nano::to_string_hex (this_l->difficulty));
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();
boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
boost::property_tree::ptree request;
request.put ("action", "work_generate");
request.put ("hash", this_l->root.to_string ());
request.put ("difficulty", nano::to_string_hex (this_l->difficulty));
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();
boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
if (connection->response.result () == boost::beast::http::status::ok)
{
if (connection->response.result () == boost::beast::http::status::ok)
{
this_l->success (connection->response.body (), connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ()));
this_l->failure (connection->address);
}
this_l->success (connection->response.body (), connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->node->logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else if (ec == boost::system::errc::operation_canceled)
{
// The only case where we send a cancel is if we preempt stopped waiting for the response
this_l->cancel (connection);
this_l->failure (connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
}
else
{
handle_failure (true);
}
}
void stop ()
void cancel (std::shared_ptr<work_request> connection)
{
auto this_l (shared_from_this ());
std::lock_guard<std::mutex> lock (mutex);
for (auto const & i : outstanding)
{
auto host (i.first);
node->background ([this_l, host]() {
auto cancelling (std::make_shared<work_request> (node->io_ctx, connection->address, connection->port));
cancelling->socket.async_connect (nano::tcp_endpoint (cancelling->address, cancelling->port), [this_l, cancelling](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
boost::property_tree::ptree request;
Expand All @@ -1095,19 +1120,56 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
boost::beast::http::request<boost::beast::http::string_body> request;
request.method (boost::beast::http::verb::post);
request.set (boost::beast::http::field::content_type, "application/json");
request.target ("/");
request.version (11);
request.body () = request_string;
request.prepare_payload ();
auto socket (std::make_shared<boost::asio::ip::tcp::socket> (this_l->node->io_ctx));
boost::beast::http::async_write (*socket, request, [socket](boost::system::error_code const & ec, size_t bytes_transferred) {
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();

boost::beast::http::async_write (cancelling->socket, *request, [this_l, request, cancelling](boost::system::error_code const & ec, size_t bytes_transferred) {
if (ec)
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling->address % cancelling->port % ec.message () % ec.value ()));
}
});
});
}
});
}
void stop (bool const local_stop)
{
if (!stopped.exchange (true))
{
std::lock_guard<std::mutex> lock (mutex);
if (local_stop && (node->config.work_threads != 0 || node->work.opencl))
{
node->work.cancel (root);
}
for (auto & i : connections)
{
auto connection = i.lock ();
if (connection)
{
boost::system::error_code ec;
connection->socket.cancel (ec);
if (ec)
{
node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ()));
}
try
{
connection->socket.close ();
}
catch (const boost::system::system_error & ec)
{
node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ()));
}
}
}
connections.clear ();
outstanding.clear ();
}
outstanding.clear ();
}
void success (std::string const & body_a, boost::asio::ip::address const & address)
{
Expand All @@ -1124,8 +1186,9 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
uint64_t result_difficulty (0);
if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty)
{
node->unresponsive_work_peers = false;
set_once (work);
stop ();
stop (true);
}
else
{
Expand All @@ -1147,7 +1210,7 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
}
void set_once (uint64_t work_a)
{
if (!completed.test_and_set ())
if (!completed.exchange (true))
{
callback (work_a);
}
Expand All @@ -1161,23 +1224,14 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
{
if (last)
{
if (!completed.test_and_set ())
if (!completed)
{
if (node->config.work_threads != 0 || node->work.opencl)
{
auto callback_l (callback);
// clang-format off
node->work.generate (root, [callback_l](boost::optional<uint64_t> const & work_a) {
callback_l (work_a.value ());
},
difficulty);
// clang-format on
}
else
node->unresponsive_work_peers = true;
if (!local_generation_started)
{
if (backoff == 1 && node->config.logging.work_generation_time ())
{
node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
node->logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
}
auto now (std::chrono::steady_clock::now ());
auto root_l (root);
Expand Down Expand Up @@ -1209,9 +1263,12 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
nano::block_hash root;
std::mutex mutex;
std::map<boost::asio::ip::address, uint16_t> outstanding;
std::vector<std::weak_ptr<work_request>> connections;
std::vector<std::pair<std::string, uint16_t>> need_resolve;
std::atomic_flag completed;
uint64_t difficulty;
std::atomic<bool> completed{ false };
std::atomic<bool> local_generation_started{ false };
std::atomic<bool> stopped{ false };
};
}

Expand Down
Loading