Skip to content

Commit

Permalink
Distibuted work fixes (#2230)
Browse files Browse the repository at this point in the history
* Fix work_cancel not being sent and some unreachable peers hanging the requests

* Start local work generation along with work peer requests

* Work cancel done in a new socket

* Log if work generation cannot be performed

* Making sure stop is only called once

* Simplifying connection handling

* Fix bad address from unordered map

* Stop on destructor

* Correctly stopping work generation on node stop

* Use atomic<bool> and exchange

* node.work is not stopped on node.stop due to testing setup

* Local work generation is now only used in case peers are unresponsive

This change enhances the previous behavior. Local work generation is only used after all peers are unresponsive.

A flag is set in the node (unresponsive_work_peers) so that for the next distributed work, local generation will start immediately. Work peer requests are still sent, and as soon as one replies with valid work, local generation is delayed again, until all are unresponsive, and so on.

This is more of a fallback mechanism when all peers are failing, as the previous behavior would always wait for timeouts on peers (which can be long, 2 minutes here). The only case not handled for simplicity is when multiple work is queued, and the first one has unresponsive peers. In this case, the currently queued work requests will not start work generation immediately, only for the next queued distributed work.

* Also start local generation immediately if there are no work peers

* Robustify

* Unbreak tests

* No need to wrap in node background
  • Loading branch information
guilhermelawless authored and zhyatt committed Aug 23, 2019
1 parent 20b52ac commit ddf4d66
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 107 deletions.
38 changes: 21 additions & 17 deletions nano/lib/work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void nano::work_pool::loop (uint64_t thread)
blake2b_init (&hash, sizeof (output));
std::unique_lock<std::mutex> lock (mutex);
auto pow_sleep = pow_rate_limiter;
while (!done || !pending.empty ())
while (!done)
{
auto empty (pending.empty ());
if (thread == 0)
Expand Down Expand Up @@ -139,33 +139,37 @@ void nano::work_pool::loop (uint64_t thread)
void nano::work_pool::cancel (nano::uint256_union const & root_a)
{
std::lock_guard<std::mutex> lock (mutex);
if (!pending.empty ())
if (!done)
{
if (pending.front ().item == root_a)
if (!pending.empty ())
{
++ticket;
if (pending.front ().item == root_a)
{
++ticket;
}
}
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
bool result;
if (item_a.item == root_a)
{
item_a.callback (boost::none);
result = true;
}
else
{
result = false;
}
return result;
});
}
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
bool result;
if (item_a.item == root_a)
{
item_a.callback (boost::none);
result = true;
}
else
{
result = false;
}
return result;
});
}

void nano::work_pool::stop ()
{
{
std::lock_guard<std::mutex> lock (mutex);
done = true;
++ticket;
}
producer_condition.notify_all ();
}
Expand Down
237 changes: 147 additions & 90 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ startup_time (std::chrono::steady_clock::now ())
logger.always_log ("Active network: ", network_label);

logger.always_log (boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ()));
logger.always_log (boost::str (boost::format ("%1% work peers configured") % config.work_peers.size ()));
if (config.work_peers.empty () && config.work_threads == 0 && !work.opencl)
{
logger.always_log ("Work generation is disabled");
}

if (config.logging.node_lifetime_tracing ())
{
Expand Down Expand Up @@ -681,6 +686,7 @@ void nano::node::stop ()
stats.stop ();
write_database_queue.stop ();
worker.stop ();
// work pool is not stopped on purpose due to testing setup
}
}

Expand Down Expand Up @@ -953,6 +959,10 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
{
assert (node_a != nullptr);
}
~distributed_work ()
{
stop (true);
}
distributed_work (unsigned int backoff_a, std::shared_ptr<nano::node> const & node_a, nano::block_hash const & root_a, std::function<void(uint64_t)> const & callback_a, uint64_t difficulty_a) :
callback (callback_a),
backoff (backoff_a),
Expand All @@ -962,7 +972,7 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
difficulty (difficulty_a)
{
assert (node_a != nullptr);
completed.clear ();
assert (!completed);
}
void start ()
{
Expand Down Expand Up @@ -1004,88 +1014,103 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
}
void start_work ()
{
auto this_l (shared_from_this ());

// Start work generation if peers are not acting correctly, or if there are no peers configured
if ((outstanding.empty () || node->unresponsive_work_peers) && (node->config.work_threads != 0 || node->work.opencl))
{
local_generation_started = true;
node->work.generate (this_l->root, [this_l](boost::optional<uint64_t> const & work_a) {
if (work_a)
{
this_l->set_once (work_a.value ());
this_l->stop (false);
}
},
difficulty);
}

if (!outstanding.empty ())
{
auto this_l (shared_from_this ());
std::lock_guard<std::mutex> lock (mutex);
std::lock_guard<std::mutex> guard (mutex);
for (auto const & i : outstanding)
{
auto host (i.first);
auto service (i.second);
node->background ([this_l, host, service]() {
auto connection (std::make_shared<work_request> (this_l->node->io_ctx, host, service));
connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) {
if (!ec)
auto connection (std::make_shared<work_request> (this_l->node->io_ctx, host, service));
connections.push_back (connection);
connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
std::string request_string;
boost::property_tree::ptree request;
request.put ("action", "work_generate");
request.put ("hash", this_l->root.to_string ());
request.put ("difficulty", nano::to_string_hex (this_l->difficulty));
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();
boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
boost::property_tree::ptree request;
request.put ("action", "work_generate");
request.put ("hash", this_l->root.to_string ());
request.put ("difficulty", nano::to_string_hex (this_l->difficulty));
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();
boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
if (connection->response.result () == boost::beast::http::status::ok)
{
if (connection->response.result () == boost::beast::http::status::ok)
{
this_l->success (connection->response.body (), connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ()));
this_l->failure (connection->address);
}
this_l->success (connection->response.body (), connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->node->logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else if (ec == boost::system::errc::operation_canceled)
{
// The only case where we send a cancel is if we preempt stopped waiting for the response
this_l->cancel (connection);
this_l->failure (connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
}
else
{
handle_failure (true);
}
}
void stop ()
void cancel (std::shared_ptr<work_request> connection)
{
auto this_l (shared_from_this ());
std::lock_guard<std::mutex> lock (mutex);
for (auto const & i : outstanding)
{
auto host (i.first);
node->background ([this_l, host]() {
auto cancelling (std::make_shared<work_request> (node->io_ctx, connection->address, connection->port));
cancelling->socket.async_connect (nano::tcp_endpoint (cancelling->address, cancelling->port), [this_l, cancelling](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
boost::property_tree::ptree request;
Expand All @@ -1095,19 +1120,56 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
boost::beast::http::request<boost::beast::http::string_body> request;
request.method (boost::beast::http::verb::post);
request.set (boost::beast::http::field::content_type, "application/json");
request.target ("/");
request.version (11);
request.body () = request_string;
request.prepare_payload ();
auto socket (std::make_shared<boost::asio::ip::tcp::socket> (this_l->node->io_ctx));
boost::beast::http::async_write (*socket, request, [socket](boost::system::error_code const & ec, size_t bytes_transferred) {
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();

boost::beast::http::async_write (cancelling->socket, *request, [this_l, request, cancelling](boost::system::error_code const & ec, size_t bytes_transferred) {
if (ec)
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling->address % cancelling->port % ec.message () % ec.value ()));
}
});
});
}
});
}
void stop (bool const local_stop)
{
if (!stopped.exchange (true))
{
std::lock_guard<std::mutex> lock (mutex);
if (local_stop && (node->config.work_threads != 0 || node->work.opencl))
{
node->work.cancel (root);
}
for (auto & i : connections)
{
auto connection = i.lock ();
if (connection)
{
boost::system::error_code ec;
connection->socket.cancel (ec);
if (ec)
{
node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ()));
}
try
{
connection->socket.close ();
}
catch (const boost::system::system_error & ec)
{
node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ()));
}
}
}
connections.clear ();
outstanding.clear ();
}
outstanding.clear ();
}
void success (std::string const & body_a, boost::asio::ip::address const & address)
{
Expand All @@ -1124,8 +1186,9 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
uint64_t result_difficulty (0);
if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty)
{
node->unresponsive_work_peers = false;
set_once (work);
stop ();
stop (true);
}
else
{
Expand All @@ -1147,7 +1210,7 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
}
void set_once (uint64_t work_a)
{
if (!completed.test_and_set ())
if (!completed.exchange (true))
{
callback (work_a);
}
Expand All @@ -1161,23 +1224,14 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
{
if (last)
{
if (!completed.test_and_set ())
if (!completed)
{
if (node->config.work_threads != 0 || node->work.opencl)
{
auto callback_l (callback);
// clang-format off
node->work.generate (root, [callback_l](boost::optional<uint64_t> const & work_a) {
callback_l (work_a.value ());
},
difficulty);
// clang-format on
}
else
node->unresponsive_work_peers = true;
if (!local_generation_started)
{
if (backoff == 1 && node->config.logging.work_generation_time ())
{
node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
node->logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
}
auto now (std::chrono::steady_clock::now ());
auto root_l (root);
Expand Down Expand Up @@ -1209,9 +1263,12 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
nano::block_hash root;
std::mutex mutex;
std::map<boost::asio::ip::address, uint16_t> outstanding;
std::vector<std::weak_ptr<work_request>> connections;
std::vector<std::pair<std::string, uint16_t>> need_resolve;
std::atomic_flag completed;
uint64_t difficulty;
std::atomic<bool> completed{ false };
std::atomic<bool> local_generation_started{ false };
std::atomic<bool> stopped{ false };
};
}

Expand Down
Loading

0 comments on commit ddf4d66

Please sign in to comment.