Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distibuted work fixes #2230

Merged
merged 17 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions nano/lib/work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void nano::work_pool::loop (uint64_t thread)
blake2b_init (&hash, sizeof (output));
std::unique_lock<std::mutex> lock (mutex);
auto pow_sleep = pow_rate_limiter;
while (!done || !pending.empty ())
while (!done)
{
auto empty (pending.empty ());
if (thread == 0)
Expand Down Expand Up @@ -139,33 +139,37 @@ void nano::work_pool::loop (uint64_t thread)
void nano::work_pool::cancel (nano::uint256_union const & root_a)
{
std::lock_guard<std::mutex> lock (mutex);
if (!pending.empty ())
if (!done)
{
if (pending.front ().item == root_a)
if (!pending.empty ())
{
++ticket;
if (pending.front ().item == root_a)
{
++ticket;
}
}
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
bool result;
if (item_a.item == root_a)
{
item_a.callback (boost::none);
result = true;
}
else
{
result = false;
}
return result;
});
}
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
bool result;
if (item_a.item == root_a)
{
item_a.callback (boost::none);
result = true;
}
else
{
result = false;
}
return result;
});
}

void nano::work_pool::stop ()
{
{
std::lock_guard<std::mutex> lock (mutex);
done = true;
++ticket;
}
producer_condition.notify_all ();
}
Expand Down
154 changes: 106 additions & 48 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ startup_time (std::chrono::steady_clock::now ())
logger.always_log ("Active network: ", network_label);

logger.always_log (boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ()));
logger.always_log (boost::str (boost::format ("%1% work peers configured") % config.work_peers.size ()));
if (config.work_peers.empty () && config.work_threads == 0 && !work.opencl)
{
logger.always_log ("Work generation is disabled");
}

if (config.logging.node_lifetime_tracing ())
{
Expand Down Expand Up @@ -659,6 +664,7 @@ void nano::node::stop ()
if (!stopped.exchange (true))
{
logger.always_log ("Node stopping");
work.stop ();
block_processor.stop ();
if (block_processor_thread.joinable ())
{
Expand Down Expand Up @@ -943,6 +949,10 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
{
assert (node_a != nullptr);
}
~distributed_work ()
{
stop (true);
}
distributed_work (unsigned int backoff_a, std::shared_ptr<nano::node> const & node_a, nano::block_hash const & root_a, std::function<void(uint64_t)> const & callback_a, uint64_t difficulty_a) :
callback (callback_a),
backoff (backoff_a),
Expand Down Expand Up @@ -994,16 +1004,18 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
}
void start_work ()
{
auto this_l (shared_from_this ());

if (!outstanding.empty ())
{
auto this_l (shared_from_this ());
std::lock_guard<std::mutex> lock (mutex);
for (auto const & i : outstanding)
{
auto host (i.first);
auto service (i.second);
node->background ([this_l, host, service]() {
auto connection (std::make_shared<work_request> (this_l->node->io_ctx, host, service));
this_l->add (connection);
connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) {
if (!ec)
{
Expand Down Expand Up @@ -1040,6 +1052,12 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
this_l->failure (connection->address);
}
}
else if (ec == boost::system::errc::operation_canceled)
{
// The only case where we send a cancel is if we preempt stopped waiting for the response
this_l->cancel (connection);
this_l->failure (connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
Expand All @@ -1063,19 +1081,27 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
});
}
}
else
if (node->config.work_threads != 0 || node->work.opencl)
{
handle_failure (true);
// clang-format off
node->work.generate (root, [this_l](boost::optional<uint64_t> const & work_a) {
if (work_a)
{
this_l->set_once (work_a.value ());
this_l->stop (false);
}
},
difficulty);
// clang-format on
}
}
void stop ()
void cancel (std::shared_ptr<work_request> connection)
{
auto this_l (shared_from_this ());
std::lock_guard<std::mutex> lock (mutex);
for (auto const & i : outstanding)
{
auto host (i.first);
node->background ([this_l, host]() {
auto cancelling (std::make_shared<work_request> (node->io_ctx, connection->address, connection->port));
cancelling->socket.async_connect (nano::tcp_endpoint (cancelling->address, cancelling->port), [this_l, cancelling](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
boost::property_tree::ptree request;
Expand All @@ -1085,19 +1111,57 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
boost::beast::http::request<boost::beast::http::string_body> request;
request.method (boost::beast::http::verb::post);
request.set (boost::beast::http::field::content_type, "application/json");
request.target ("/");
request.version (11);
request.body () = request_string;
request.prepare_payload ();
auto socket (std::make_shared<boost::asio::ip::tcp::socket> (this_l->node->io_ctx));
boost::beast::http::async_write (*socket, request, [socket](boost::system::error_code const & ec, size_t bytes_transferred) {
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();

boost::beast::http::async_write (cancelling->socket, *request, [this_l, request, cancelling](boost::system::error_code const & ec, size_t bytes_transferred) {
if (ec)
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling->address % cancelling->port % ec.message () % ec.value ()));
}
});
});
}
});
}
void stop (bool const local_stop)
{
std::lock_guard<std::mutex> lock (mutex);
if (!stopped)
guilhermelawless marked this conversation as resolved.
Show resolved Hide resolved
{
stopped = true;
if (local_stop && (node->config.work_threads != 0 || node->work.opencl))
{
node->work.cancel (root);
}
for (auto & i : connections)
{
auto connection = i.lock ();
if (connection)
{
boost::system::error_code ec;
connection->socket.cancel (ec);
if (ec)
{
node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ()));
}
try
{
connection->socket.close ();
}
catch (const boost::system::system_error & ec)
{
node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ()));
}
}
}
connections.clear ();
outstanding.clear ();
}
outstanding.clear ();
}
void success (std::string const & body_a, boost::asio::ip::address const & address)
{
Expand All @@ -1115,7 +1179,7 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty)
{
set_once (work);
stop ();
stop (true);
}
else
{
Expand Down Expand Up @@ -1153,37 +1217,24 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
{
if (!completed.test_and_set ())
{
if (node->config.work_threads != 0 || node->work.opencl)
if (backoff == 1 && node->config.logging.work_generation_time ())
{
auto callback_l (callback);
// clang-format off
node->work.generate (root, [callback_l](boost::optional<uint64_t> const & work_a) {
callback_l (work_a.value ());
},
difficulty);
// clang-format on
node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
}
else
{
if (backoff == 1 && node->config.logging.work_generation_time ())
auto now (std::chrono::steady_clock::now ());
auto root_l (root);
auto callback_l (callback);
std::weak_ptr<nano::node> node_w (node);
auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5));
// clang-format off
node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] {
if (auto node_l = node_w.lock ())
{
node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
auto work_generation (std::make_shared<distributed_work> (next_backoff, node_l, root_l, callback_l, difficulty));
work_generation->start ();
}
auto now (std::chrono::steady_clock::now ());
auto root_l (root);
auto callback_l (callback);
std::weak_ptr<nano::node> node_w (node);
auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5));
// clang-format off
node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] {
if (auto node_l = node_w.lock ())
{
auto work_generation (std::make_shared<distributed_work> (next_backoff, node_l, root_l, callback_l, difficulty));
work_generation->start ();
}
});
// clang-format on
}
});
// clang-format on
}
}
}
Expand All @@ -1193,15 +1244,22 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
outstanding.erase (address);
return outstanding.empty ();
}
void add (std::shared_ptr<work_request> & connection)
{
std::lock_guard<std::mutex> lock (mutex);
connections.push_back (connection);
}
std::function<void(uint64_t)> callback;
unsigned int backoff; // in seconds
std::shared_ptr<nano::node> node;
nano::block_hash root;
std::mutex mutex;
std::map<boost::asio::ip::address, uint16_t> outstanding;
std::vector<std::weak_ptr<work_request>> connections;
std::vector<std::pair<std::string, uint16_t>> need_resolve;
std::atomic_flag completed;
uint64_t difficulty;
std::atomic_bool stopped{ false };
guilhermelawless marked this conversation as resolved.
Show resolved Hide resolved
};
}

Expand Down