From 6f173913f4725ac921e83c6e141153dc31468a01 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 12:18:31 +0100 Subject: [PATCH 01/16] Fix work_cancel not being sent and some unreachable peers hanging the requests --- nano/node/node.cpp | 103 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 21 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 5a7680b1c7..fc98cb3498 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -934,6 +934,7 @@ class work_request boost::beast::flat_buffer buffer; boost::beast::http::response response; boost::asio::ip::tcp::socket socket; + std::atomic_bool awaiting_work{ false }; }; class distributed_work : public std::enable_shared_from_this { @@ -1004,6 +1005,7 @@ class distributed_work : public std::enable_shared_from_this auto service (i.second); node->background ([this_l, host, service]() { auto connection (std::make_shared (this_l->node->io_ctx, host, service)); + this_l->connecting (host, connection); connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) { if (!ec) { @@ -1027,9 +1029,11 @@ class distributed_work : public std::enable_shared_from_this boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) { if (!ec) { + this_l->requested (connection->address); boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) { if (!ec) { + this_l->received (connection->address); if (connection->response.result () == boost::beast::http::status::ok) { this_l->success (connection->response.body (), connection->address); @@ -1040,6 +1044,38 @@ class distributed_work : public std::enable_shared_from_this this_l->failure (connection->address); } } + else if (ec == boost::system::errc::operation_canceled) + { + connection->socket.async_wait (boost::asio::socket_base::wait_write, [this_l, connection](boost::system::error_code const & ec) { + if (!ec) + { + std::string request_string; + { + boost::property_tree::ptree request; + request.put ("action", "work_cancel"); + request.put ("hash", this_l->root.to_string ()); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + request_string = ostream.str (); + } + auto request (std::make_shared> ()); + request->method (boost::beast::http::verb::post); + request->set (boost::beast::http::field::content_type, "application/json"); + request->target ("/"); + request->version (11); + request->body () = request_string; + request->prepare_payload (); + + boost::beast::http::async_write (connection->socket, *request, [this_l, request, connection](boost::system::error_code const & ec, size_t bytes_transferred) { + if (ec) + { + this_l->node->logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); + } + }); + } + }); + this_l->failure (connection->address); + } else { this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); @@ -1070,33 +1106,34 @@ class distributed_work : public std::enable_shared_from_this } void stop () { + stopped = true; auto this_l (shared_from_this ()); std::lock_guard lock (mutex); - for (auto const & i : outstanding) + for (auto & i : connections) { - auto host (i.first); - node->background ([this_l, host]() { - std::string request_string; + auto connection = i.second.lock (); + if (connection && connection->awaiting_work) + { + boost::system::error_code ec; + connection->socket.cancel (ec); + if (ec) { - boost::property_tree::ptree request; - request.put ("action", "work_cancel"); - request.put ("hash", this_l->root.to_string ()); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - request_string = ostream.str (); + this_l->node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); } - boost::beast::http::request request; - request.method (boost::beast::http::verb::post); - request.set (boost::beast::http::field::content_type, "application/json"); - request.target ("/"); - request.version (11); - request.body () = request_string; - request.prepare_payload (); - auto socket (std::make_shared (this_l->node->io_ctx)); - boost::beast::http::async_write (*socket, request, [socket](boost::system::error_code const & ec, size_t bytes_transferred) { - }); - }); + } + else if (connection) + { + try + { + connection->socket.close (); + } + catch (const boost::system::system_error & ec) + { + this_l->node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ())); + } + } } + connections.clear (); outstanding.clear (); } void success (std::string const & body_a, boost::asio::ip::address const & address) @@ -1193,15 +1230,39 @@ class distributed_work : public std::enable_shared_from_this outstanding.erase (address); return outstanding.empty (); } + void connecting (boost::asio::ip::address const & address, std::shared_ptr & connection) + { + std::lock_guard lock (mutex); + connections[address] = connection; + } + void requested (boost::asio::ip::address const & address) + { + std::lock_guard lock (mutex); + auto existing (connections.find (address)); + if (existing != connections.end ()) + { + if (auto connection = existing->second.lock ()) + { + connection->awaiting_work = true; + } + } + } + void received (boost::asio::ip::address const & address) + { + std::lock_guard lock (mutex); + connections.erase (address); + } std::function callback; unsigned int backoff; // in seconds std::shared_ptr node; nano::block_hash root; std::mutex mutex; std::map outstanding; + std::map> connections; std::vector> need_resolve; std::atomic_flag completed; uint64_t difficulty; + bool stopped{ false }; }; } From bdcc24d142a85c62b2501c586148960c458a7cf3 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 12:51:55 +0100 Subject: [PATCH 02/16] Start local work generation along with work peer requests --- nano/node/node.cpp | 64 +++++++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index fc98cb3498..b9660e4acc 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -995,9 +995,10 @@ class distributed_work : public std::enable_shared_from_this } void start_work () { + auto this_l (shared_from_this ()); + if (!outstanding.empty ()) { - auto this_l (shared_from_this ()); std::lock_guard lock (mutex); for (auto const & i : outstanding) { @@ -1099,9 +1100,17 @@ class distributed_work : public std::enable_shared_from_this }); } } - else + if (node->config.work_threads != 0 || node->work.opencl) { - handle_failure (true); + // clang-format off + node->work.generate (root, [this_l](boost::optional const & work_a) { + if (work_a) + { + this_l->set_once (work_a.value ()); + } + }, + difficulty); + // clang-format on } } void stop () @@ -1121,7 +1130,7 @@ class distributed_work : public std::enable_shared_from_this this_l->node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); } } - else if (connection) + else if (connection) // connection can be hanging { try { @@ -1152,7 +1161,10 @@ class distributed_work : public std::enable_shared_from_this if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty) { set_once (work); - stop (); + if (node->config.work_threads != 0 || node->work.opencl) + { + node->work.cancel (root); + } } else { @@ -1177,6 +1189,7 @@ class distributed_work : public std::enable_shared_from_this if (!completed.test_and_set ()) { callback (work_a); + stop (); } } void failure (boost::asio::ip::address const & address) @@ -1190,37 +1203,24 @@ class distributed_work : public std::enable_shared_from_this { if (!completed.test_and_set ()) { - if (node->config.work_threads != 0 || node->work.opencl) + if (backoff == 1 && node->config.logging.work_generation_time ()) { - auto callback_l (callback); - // clang-format off - node->work.generate (root, [callback_l](boost::optional const & work_a) { - callback_l (work_a.value ()); - }, - difficulty); - // clang-format on + node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); } - else - { - if (backoff == 1 && node->config.logging.work_generation_time ()) + auto now (std::chrono::steady_clock::now ()); + auto root_l (root); + auto callback_l (callback); + std::weak_ptr node_w (node); + auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); + // clang-format off + node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] { + if (auto node_l = node_w.lock ()) { - node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); + auto work_generation (std::make_shared (next_backoff, node_l, root_l, callback_l, difficulty)); + work_generation->start (); } - auto now (std::chrono::steady_clock::now ()); - auto root_l (root); - auto callback_l (callback); - std::weak_ptr node_w (node); - auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); - // clang-format off - node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] { - if (auto node_l = node_w.lock ()) - { - auto work_generation (std::make_shared (next_backoff, node_l, root_l, callback_l, difficulty)); - work_generation->start (); - } - }); - // clang-format on - } + }); + // clang-format on } } } From 2dc73b488e833b298cf2a6b3b3b0c528ed42ee1f Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 13:29:26 +0100 Subject: [PATCH 03/16] Work cancel done in a new socket --- nano/node/node.cpp | 64 +++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index b9660e4acc..40a17207ed 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1047,34 +1047,7 @@ class distributed_work : public std::enable_shared_from_this } else if (ec == boost::system::errc::operation_canceled) { - connection->socket.async_wait (boost::asio::socket_base::wait_write, [this_l, connection](boost::system::error_code const & ec) { - if (!ec) - { - std::string request_string; - { - boost::property_tree::ptree request; - request.put ("action", "work_cancel"); - request.put ("hash", this_l->root.to_string ()); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - request_string = ostream.str (); - } - auto request (std::make_shared> ()); - request->method (boost::beast::http::verb::post); - request->set (boost::beast::http::field::content_type, "application/json"); - request->target ("/"); - request->version (11); - request->body () = request_string; - request->prepare_payload (); - - boost::beast::http::async_write (connection->socket, *request, [this_l, request, connection](boost::system::error_code const & ec, size_t bytes_transferred) { - if (ec) - { - this_l->node->logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - } - }); - } - }); + this_l->cancel (connection); this_l->failure (connection->address); } else @@ -1113,6 +1086,39 @@ class distributed_work : public std::enable_shared_from_this // clang-format on } } + void cancel (std::shared_ptr connection) + { + auto this_l (shared_from_this ()); + auto cancelling (std::make_shared (node->io_ctx, connection->address, connection->port)); + cancelling->socket.async_connect (nano::tcp_endpoint (cancelling->address, cancelling->port), [this_l, cancelling](boost::system::error_code const & ec) { + if (!ec) + { + std::string request_string; + { + boost::property_tree::ptree request; + request.put ("action", "work_cancel"); + request.put ("hash", this_l->root.to_string ()); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + request_string = ostream.str (); + } + auto request (std::make_shared> ()); + request->method (boost::beast::http::verb::post); + request->set (boost::beast::http::field::content_type, "application/json"); + request->target ("/"); + request->version (11); + request->body () = request_string; + request->prepare_payload (); + + boost::beast::http::async_write (cancelling->socket, *request, [this_l, request, cancelling](boost::system::error_code const & ec, size_t bytes_transferred) { + if (ec) + { + this_l->node->logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling->address % cancelling->port % ec.message () % ec.value ())); + } + }); + } + }); + } void stop () { stopped = true; @@ -1130,7 +1136,7 @@ class distributed_work : public std::enable_shared_from_this this_l->node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); } } - else if (connection) // connection can be hanging + if (connection) // connection can be hanging { try { From 8e289ca349febf93108359275b2ac51ac1d65b01 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 14:00:37 +0100 Subject: [PATCH 04/16] Log if work generation cannot be performed --- nano/node/node.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 40a17207ed..40831cf311 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -366,6 +366,11 @@ startup_time (std::chrono::steady_clock::now ()) logger.always_log ("Active network: ", network_label); logger.always_log (boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ())); + logger.always_log (boost::str (boost::format ("%1% work peers configured") % config.work_peers.size ())); + if (config.work_peers.empty () && config.work_threads == 0 && !work.opencl) + { + logger.always_log ("Work generation is disabled"); + } if (config.logging.node_lifetime_tracing ()) { @@ -1136,7 +1141,7 @@ class distributed_work : public std::enable_shared_from_this this_l->node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); } } - if (connection) // connection can be hanging + if (connection) // may be hanging { try { From dabbe5c71b28c0c7b6bfc6b193b7fd61fffb91cb Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 14:06:10 +0100 Subject: [PATCH 05/16] Making sure stop is only called once --- nano/node/node.cpp | 47 ++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 40831cf311..c4d7e276bf 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1126,35 +1126,38 @@ class distributed_work : public std::enable_shared_from_this } void stop () { - stopped = true; - auto this_l (shared_from_this ()); - std::lock_guard lock (mutex); - for (auto & i : connections) + if (!stopped) { - auto connection = i.second.lock (); - if (connection && connection->awaiting_work) - { - boost::system::error_code ec; - connection->socket.cancel (ec); - if (ec) - { - this_l->node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); - } - } - if (connection) // may be hanging + stopped = true; + auto this_l (shared_from_this ()); + std::lock_guard lock (mutex); + for (auto & i : connections) { - try + auto connection = i.second.lock (); + if (connection && connection->awaiting_work) { - connection->socket.close (); + boost::system::error_code ec; + connection->socket.cancel (ec); + if (ec) + { + this_l->node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); + } } - catch (const boost::system::system_error & ec) + if (connection) // may be hanging { - this_l->node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ())); + try + { + connection->socket.close (); + } + catch (const boost::system::system_error & ec) + { + this_l->node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ())); + } } } + connections.clear (); + outstanding.clear (); } - connections.clear (); - outstanding.clear (); } void success (std::string const & body_a, boost::asio::ip::address const & address) { @@ -1273,7 +1276,7 @@ class distributed_work : public std::enable_shared_from_this std::vector> need_resolve; std::atomic_flag completed; uint64_t difficulty; - bool stopped{ false }; + std::atomic_bool stopped{ false }; }; } From 4efe2ebfd87b4f248c65a957af1baa381169c189 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 16:25:34 +0100 Subject: [PATCH 06/16] Simplifying connection handling --- nano/node/node.cpp | 38 ++++++++------------------------------ 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index c4d7e276bf..bcf8a1c8b2 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -939,7 +939,6 @@ class work_request boost::beast::flat_buffer buffer; boost::beast::http::response response; boost::asio::ip::tcp::socket socket; - std::atomic_bool awaiting_work{ false }; }; class distributed_work : public std::enable_shared_from_this { @@ -1011,7 +1010,7 @@ class distributed_work : public std::enable_shared_from_this auto service (i.second); node->background ([this_l, host, service]() { auto connection (std::make_shared (this_l->node->io_ctx, host, service)); - this_l->connecting (host, connection); + this_l->add_connection (connection); connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) { if (!ec) { @@ -1035,11 +1034,9 @@ class distributed_work : public std::enable_shared_from_this boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) { if (!ec) { - this_l->requested (connection->address); boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) { if (!ec) { - this_l->received (connection->address); if (connection->response.result () == boost::beast::http::status::ok) { this_l->success (connection->response.body (), connection->address); @@ -1052,6 +1049,7 @@ class distributed_work : public std::enable_shared_from_this } else if (ec == boost::system::errc::operation_canceled) { + // The only case where we send a cancel is if we preempt stopped waiting for the response this_l->cancel (connection); this_l->failure (connection->address); } @@ -1133,8 +1131,8 @@ class distributed_work : public std::enable_shared_from_this std::lock_guard lock (mutex); for (auto & i : connections) { - auto connection = i.second.lock (); - if (connection && connection->awaiting_work) + auto connection = i.lock (); + if (connection) { boost::system::error_code ec; connection->socket.cancel (ec); @@ -1142,9 +1140,6 @@ class distributed_work : public std::enable_shared_from_this { this_l->node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); } - } - if (connection) // may be hanging - { try { connection->socket.close (); @@ -1244,35 +1239,18 @@ class distributed_work : public std::enable_shared_from_this outstanding.erase (address); return outstanding.empty (); } - void connecting (boost::asio::ip::address const & address, std::shared_ptr & connection) - { - std::lock_guard lock (mutex); - connections[address] = connection; - } - void requested (boost::asio::ip::address const & address) - { - std::lock_guard lock (mutex); - auto existing (connections.find (address)); - if (existing != connections.end ()) - { - if (auto connection = existing->second.lock ()) - { - connection->awaiting_work = true; - } - } - } - void received (boost::asio::ip::address const & address) + void add_connection (std::shared_ptr & connection) { std::lock_guard lock (mutex); - connections.erase (address); + connections.push_back (connection); // weak_ptr } std::function callback; unsigned int backoff; // in seconds std::shared_ptr node; nano::block_hash root; std::mutex mutex; - std::map outstanding; - std::map> connections; + std::unordered_map outstanding; + std::vector> connections; std::vector> need_resolve; std::atomic_flag completed; uint64_t difficulty; From 6658997f6800fc3377a21966abc64d26eab71194 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 16:40:54 +0100 Subject: [PATCH 07/16] Fix bad address from unordered map --- nano/node/node.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index bcf8a1c8b2..c36b9ac1d0 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1010,7 +1010,7 @@ class distributed_work : public std::enable_shared_from_this auto service (i.second); node->background ([this_l, host, service]() { auto connection (std::make_shared (this_l->node->io_ctx, host, service)); - this_l->add_connection (connection); + this_l->add (connection); connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) { if (!ec) { @@ -1124,11 +1124,11 @@ class distributed_work : public std::enable_shared_from_this } void stop () { + std::lock_guard lock (mutex); if (!stopped) { stopped = true; auto this_l (shared_from_this ()); - std::lock_guard lock (mutex); for (auto & i : connections) { auto connection = i.lock (); @@ -1239,17 +1239,17 @@ class distributed_work : public std::enable_shared_from_this outstanding.erase (address); return outstanding.empty (); } - void add_connection (std::shared_ptr & connection) + void add (std::shared_ptr & connection) { std::lock_guard lock (mutex); - connections.push_back (connection); // weak_ptr + connections.push_back (connection); } std::function callback; unsigned int backoff; // in seconds std::shared_ptr node; nano::block_hash root; std::mutex mutex; - std::unordered_map outstanding; + std::map outstanding; std::vector> connections; std::vector> need_resolve; std::atomic_flag completed; From 0d0b3f5294722cb5b892b65424d11cd44c0a7c37 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 16:49:20 +0100 Subject: [PATCH 08/16] Stop on destructor --- nano/node/node.cpp | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index c36b9ac1d0..28fa312e96 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -948,6 +948,10 @@ class distributed_work : public std::enable_shared_from_this { assert (node_a != nullptr); } + ~distributed_work () + { + stop (); + } distributed_work (unsigned int backoff_a, std::shared_ptr const & node_a, nano::block_hash const & root_a, std::function const & callback_a, uint64_t difficulty_a) : callback (callback_a), backoff (backoff_a), @@ -1128,7 +1132,10 @@ class distributed_work : public std::enable_shared_from_this if (!stopped) { stopped = true; - auto this_l (shared_from_this ()); + if (node->config.work_threads != 0 || node->work.opencl) + { + node->work.cancel (root); + } for (auto & i : connections) { auto connection = i.lock (); @@ -1138,7 +1145,7 @@ class distributed_work : public std::enable_shared_from_this connection->socket.cancel (ec); if (ec) { - this_l->node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); + node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); } try { @@ -1146,7 +1153,7 @@ class distributed_work : public std::enable_shared_from_this } catch (const boost::system::system_error & ec) { - this_l->node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ())); + node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ())); } } } @@ -1170,10 +1177,6 @@ class distributed_work : public std::enable_shared_from_this if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty) { set_once (work); - if (node->config.work_threads != 0 || node->work.opencl) - { - node->work.cancel (root); - } } else { From 453fb8924d04a6d754d161e5e9d7f7a640afc344 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 18:01:48 +0100 Subject: [PATCH 09/16] Correctly stopping work generation on node stop --- nano/lib/work.cpp | 38 +++++++++++++++++++++----------------- nano/node/node.cpp | 10 ++++++---- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/nano/lib/work.cpp b/nano/lib/work.cpp index 7d33751e3c..2bed951c10 100644 --- a/nano/lib/work.cpp +++ b/nano/lib/work.cpp @@ -73,7 +73,7 @@ void nano::work_pool::loop (uint64_t thread) blake2b_init (&hash, sizeof (output)); std::unique_lock lock (mutex); auto pow_sleep = pow_rate_limiter; - while (!done || !pending.empty ()) + while (!done) { auto empty (pending.empty ()); if (thread == 0) @@ -139,26 +139,29 @@ void nano::work_pool::loop (uint64_t thread) void nano::work_pool::cancel (nano::uint256_union const & root_a) { std::lock_guard lock (mutex); - if (!pending.empty ()) + if (!done) { - if (pending.front ().item == root_a) + if (!pending.empty ()) { - ++ticket; + if (pending.front ().item == root_a) + { + ++ticket; + } } + pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) { + bool result; + if (item_a.item == root_a) + { + item_a.callback (boost::none); + result = true; + } + else + { + result = false; + } + return result; + }); } - pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) { - bool result; - if (item_a.item == root_a) - { - item_a.callback (boost::none); - result = true; - } - else - { - result = false; - } - return result; - }); } void nano::work_pool::stop () @@ -166,6 +169,7 @@ void nano::work_pool::stop () { std::lock_guard lock (mutex); done = true; + ++ticket; } producer_condition.notify_all (); } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 28fa312e96..aabdf3f70a 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -664,6 +664,7 @@ void nano::node::stop () if (!stopped.exchange (true)) { logger.always_log ("Node stopping"); + work.stop (); block_processor.stop (); if (block_processor_thread.joinable ()) { @@ -950,7 +951,7 @@ class distributed_work : public std::enable_shared_from_this } ~distributed_work () { - stop (); + stop (true); } distributed_work (unsigned int backoff_a, std::shared_ptr const & node_a, nano::block_hash const & root_a, std::function const & callback_a, uint64_t difficulty_a) : callback (callback_a), @@ -1087,6 +1088,7 @@ class distributed_work : public std::enable_shared_from_this if (work_a) { this_l->set_once (work_a.value ()); + this_l->stop (false); } }, difficulty); @@ -1126,13 +1128,13 @@ class distributed_work : public std::enable_shared_from_this } }); } - void stop () + void stop (bool const local_stop) { std::lock_guard lock (mutex); if (!stopped) { stopped = true; - if (node->config.work_threads != 0 || node->work.opencl) + if (local_stop && (node->config.work_threads != 0 || node->work.opencl)) { node->work.cancel (root); } @@ -1177,6 +1179,7 @@ class distributed_work : public std::enable_shared_from_this if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty) { set_once (work); + stop (true); } else { @@ -1201,7 +1204,6 @@ class distributed_work : public std::enable_shared_from_this if (!completed.test_and_set ()) { callback (work_a); - stop (); } } void failure (boost::asio::ip::address const & address) From fb06b76b4b7338a94e84e18f553fe8a9384b74ab Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 19:04:04 +0100 Subject: [PATCH 10/16] Use atomic and exchange --- nano/node/node.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index aabdf3f70a..299f83d7bf 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1130,10 +1130,9 @@ class distributed_work : public std::enable_shared_from_this } void stop (bool const local_stop) { - std::lock_guard lock (mutex); - if (!stopped) + if (!stopped.exchange (true)) { - stopped = true; + std::lock_guard lock (mutex); if (local_stop && (node->config.work_threads != 0 || node->work.opencl)) { node->work.cancel (root); @@ -1259,7 +1258,7 @@ class distributed_work : public std::enable_shared_from_this std::vector> need_resolve; std::atomic_flag completed; uint64_t difficulty; - std::atomic_bool stopped{ false }; + std::atomic stopped{ false }; }; } From e97006e958b019af9fef0053ee055c8c69d3e5db Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 20 Aug 2019 19:05:08 +0100 Subject: [PATCH 11/16] node.work is not stopped on node.stop due to testing setup --- nano/node/node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 299f83d7bf..0b8c94dddc 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -664,7 +664,6 @@ void nano::node::stop () if (!stopped.exchange (true)) { logger.always_log ("Node stopping"); - work.stop (); block_processor.stop (); if (block_processor_thread.joinable ()) { @@ -685,6 +684,7 @@ void nano::node::stop () wallets.stop (); stats.stop (); write_database_queue.stop (); + // work is not stopped on purpose due to testing setup } } From 0c5b8e9ce0e2ae51955dd348aaec52ea3de17454 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Fri, 23 Aug 2019 08:34:36 +0100 Subject: [PATCH 12/16] Local work generation is now only used in case peers are unresponsive This change enhances the previous behavior. Local work generation is only used after all peers are unresponsive. A flag is set in the node (unresponsive_work_peers) so that for the next distributed work, local generation will start immediately. Work peer requests are still sent, and as soon as one replies with valid work, local generation is delayed again, until all are unresponsive, and so on. This is more of a fallback mechanism when all peers are failing, as the previous behavior would always wait for timeouts on peers (which can be long, 2 minutes here). The only case not handled for simplicity is when multiple work is queued, and the first one has unresponsive peers. In this case, the currently queued work requests will not start work generation immediately, only for the next queued distributed work. --- nano/node/node.cpp | 41 ++++++++++++++++++++++++++++++----------- nano/node/node.hpp | 1 + 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 0b8c94dddc..396ca4ae1c 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1081,8 +1081,10 @@ class distributed_work : public std::enable_shared_from_this }); } } - if (node->config.work_threads != 0 || node->work.opencl) + // Start work generation if peers are not acting correctly + if (node->unresponsive_work_peers && (node->config.work_threads != 0 || node->work.opencl)) { + local_generation_started = true; // clang-format off node->work.generate (root, [this_l](boost::optional const & work_a) { if (work_a) @@ -1177,6 +1179,7 @@ class distributed_work : public std::enable_shared_from_this uint64_t result_difficulty (0); if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty) { + node->unresponsive_work_peers = false; set_once (work); stop (true); } @@ -1214,18 +1217,21 @@ class distributed_work : public std::enable_shared_from_this { if (last) { + node->unresponsive_work_peers = true; if (!completed.test_and_set ()) { - if (backoff == 1 && node->config.logging.work_generation_time ()) + if (!local_generation_started && node->config.work_threads == 0 && !node->work.opencl) { - node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); - } - auto now (std::chrono::steady_clock::now ()); - auto root_l (root); - auto callback_l (callback); - std::weak_ptr node_w (node); - auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); - // clang-format off + if (backoff == 1 && node->config.logging.work_generation_time ()) + { + node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); + } + auto now (std::chrono::steady_clock::now ()); + auto root_l (root); + auto callback_l (callback); + std::weak_ptr node_w (node); + auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); + // clang-format off node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] { if (auto node_l = node_w.lock ()) { @@ -1233,7 +1239,19 @@ class distributed_work : public std::enable_shared_from_this work_generation->start (); } }); - // clang-format on + // clang-format on + } + else if (!local_generation_started) + { + if (node->config.logging.work_generation_time ()) + { + node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", using local work generation as backup"); + } + auto work (node->work_generate_blocking (root, difficulty)); + set_once (work); + callback (work); + stop (false); + } } } } @@ -1258,6 +1276,7 @@ class distributed_work : public std::enable_shared_from_this std::vector> need_resolve; std::atomic_flag completed; uint64_t difficulty; + std::atomic local_generation_started{ false }; std::atomic stopped{ false }; }; } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 0d1ad77b68..5cd0f101d8 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -183,6 +183,7 @@ class node final : public std::enable_shared_from_this nano::wallets wallets; const std::chrono::steady_clock::time_point startup_time; std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week + std::atomic unresponsive_work_peers{ false }; std::atomic stopped{ false }; static double constexpr price_max = 16.0; static double constexpr free_cutoff = 1024.0; From dcdfa2eea7ddb2af57ab28093c0897d2d6aa900a Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Fri, 23 Aug 2019 10:14:17 +0100 Subject: [PATCH 13/16] Also start local generation immediately if there are no work peers --- nano/node/node.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 396ca4ae1c..6759661f62 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1081,8 +1081,8 @@ class distributed_work : public std::enable_shared_from_this }); } } - // Start work generation if peers are not acting correctly - if (node->unresponsive_work_peers && (node->config.work_threads != 0 || node->work.opencl)) + // Start work generation if peers are not acting correctly, or if there are no peers configured + if ((outstanding.empty () || node->unresponsive_work_peers) && (node->config.work_threads != 0 || node->work.opencl)) { local_generation_started = true; // clang-format off From 0985b1c9bf9ed8d124b38bcabd28d9a07fb28271 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Fri, 23 Aug 2019 12:12:31 +0100 Subject: [PATCH 14/16] Robustify --- nano/node/node.cpp | 77 ++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 44 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 6759661f62..1f2062702a 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -962,7 +962,7 @@ class distributed_work : public std::enable_shared_from_this difficulty (difficulty_a) { assert (node_a != nullptr); - completed.clear (); + assert (!completed); } void start () { @@ -1006,9 +1006,27 @@ class distributed_work : public std::enable_shared_from_this { auto this_l (shared_from_this ()); + // Start work generation if peers are not acting correctly, or if there are no peers configured + if ((outstanding.empty () || node->unresponsive_work_peers) && (node->config.work_threads != 0 || node->work.opencl)) + { + local_generation_started = true; + // clang-format off + node->background ([this_l]() { + this_l->node->work.generate (this_l->root, [this_l](boost::optional const & work_a) { + if (work_a) + { + this_l->set_once (work_a.value ()); + this_l->stop (false); + } + }, + this_l->difficulty); + }); + // clang-format on + } + if (!outstanding.empty ()) { - std::lock_guard lock (mutex); + std::lock_guard guard (mutex); for (auto const & i : outstanding) { auto host (i.first); @@ -1081,21 +1099,6 @@ class distributed_work : public std::enable_shared_from_this }); } } - // Start work generation if peers are not acting correctly, or if there are no peers configured - if ((outstanding.empty () || node->unresponsive_work_peers) && (node->config.work_threads != 0 || node->work.opencl)) - { - local_generation_started = true; - // clang-format off - node->work.generate (root, [this_l](boost::optional const & work_a) { - if (work_a) - { - this_l->set_once (work_a.value ()); - this_l->stop (false); - } - }, - difficulty); - // clang-format on - } } void cancel (std::shared_ptr connection) { @@ -1203,7 +1206,7 @@ class distributed_work : public std::enable_shared_from_this } void set_once (uint64_t work_a) { - if (!completed.test_and_set ()) + if (!completed.exchange (true)) { callback (work_a); } @@ -1218,20 +1221,18 @@ class distributed_work : public std::enable_shared_from_this if (last) { node->unresponsive_work_peers = true; - if (!completed.test_and_set ()) + if (!completed && !local_generation_started) { - if (!local_generation_started && node->config.work_threads == 0 && !node->work.opencl) + if (backoff == 1 && node->config.logging.work_generation_time ()) { - if (backoff == 1 && node->config.logging.work_generation_time ()) - { - node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); - } - auto now (std::chrono::steady_clock::now ()); - auto root_l (root); - auto callback_l (callback); - std::weak_ptr node_w (node); - auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); - // clang-format off + node->logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); + } + auto now (std::chrono::steady_clock::now ()); + auto root_l (root); + auto callback_l (callback); + std::weak_ptr node_w (node); + auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); + // clang-format off node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] { if (auto node_l = node_w.lock ()) { @@ -1239,19 +1240,7 @@ class distributed_work : public std::enable_shared_from_this work_generation->start (); } }); - // clang-format on - } - else if (!local_generation_started) - { - if (node->config.logging.work_generation_time ()) - { - node->logger.try_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", using local work generation as backup"); - } - auto work (node->work_generate_blocking (root, difficulty)); - set_once (work); - callback (work); - stop (false); - } + // clang-format on } } } @@ -1274,8 +1263,8 @@ class distributed_work : public std::enable_shared_from_this std::map outstanding; std::vector> connections; std::vector> need_resolve; - std::atomic_flag completed; uint64_t difficulty; + std::atomic completed{ false }; std::atomic local_generation_started{ false }; std::atomic stopped{ false }; }; From b4d04e761cc7e6b504c62679feb1d6500519ff2d Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Fri, 23 Aug 2019 13:39:05 +0100 Subject: [PATCH 15/16] Unbreak tests --- nano/node/node.cpp | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 1f2062702a..1a20f9b6bd 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1010,18 +1010,14 @@ class distributed_work : public std::enable_shared_from_this if ((outstanding.empty () || node->unresponsive_work_peers) && (node->config.work_threads != 0 || node->work.opencl)) { local_generation_started = true; - // clang-format off - node->background ([this_l]() { - this_l->node->work.generate (this_l->root, [this_l](boost::optional const & work_a) { - if (work_a) - { - this_l->set_once (work_a.value ()); - this_l->stop (false); - } - }, - this_l->difficulty); - }); - // clang-format on + node->work.generate (this_l->root, [this_l](boost::optional const & work_a) { + if (work_a) + { + this_l->set_once (work_a.value ()); + this_l->stop (false); + } + }, + difficulty); } if (!outstanding.empty ()) From 56665bdca7cd1b55c489ec9d08228a26a76ee727 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Fri, 23 Aug 2019 15:21:36 +0100 Subject: [PATCH 16/16] No need to wrap in node background --- nano/node/node.cpp | 154 ++++++++++++++++++++++----------------------- 1 file changed, 75 insertions(+), 79 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 1a20f9b6bd..856f6f457c 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1027,71 +1027,69 @@ class distributed_work : public std::enable_shared_from_this { auto host (i.first); auto service (i.second); - node->background ([this_l, host, service]() { - auto connection (std::make_shared (this_l->node->io_ctx, host, service)); - this_l->add (connection); - connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) { - if (!ec) + auto connection (std::make_shared (this_l->node->io_ctx, host, service)); + connections.push_back (connection); + connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) { + if (!ec) + { + std::string request_string; { - std::string request_string; + boost::property_tree::ptree request; + request.put ("action", "work_generate"); + request.put ("hash", this_l->root.to_string ()); + request.put ("difficulty", nano::to_string_hex (this_l->difficulty)); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + request_string = ostream.str (); + } + auto request (std::make_shared> ()); + request->method (boost::beast::http::verb::post); + request->set (boost::beast::http::field::content_type, "application/json"); + request->target ("/"); + request->version (11); + request->body () = request_string; + request->prepare_payload (); + boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) { + if (!ec) { - boost::property_tree::ptree request; - request.put ("action", "work_generate"); - request.put ("hash", this_l->root.to_string ()); - request.put ("difficulty", nano::to_string_hex (this_l->difficulty)); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - request_string = ostream.str (); - } - auto request (std::make_shared> ()); - request->method (boost::beast::http::verb::post); - request->set (boost::beast::http::field::content_type, "application/json"); - request->target ("/"); - request->version (11); - request->body () = request_string; - request->prepare_payload (); - boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) { - if (!ec) - { - boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) { - if (!ec) + boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) { + if (!ec) + { + if (connection->response.result () == boost::beast::http::status::ok) { - if (connection->response.result () == boost::beast::http::status::ok) - { - this_l->success (connection->response.body (), connection->address); - } - else - { - this_l->node->logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ())); - this_l->failure (connection->address); - } - } - else if (ec == boost::system::errc::operation_canceled) - { - // The only case where we send a cancel is if we preempt stopped waiting for the response - this_l->cancel (connection); - this_l->failure (connection->address); + this_l->success (connection->response.body (), connection->address); } else { - this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); + this_l->node->logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ())); this_l->failure (connection->address); } - }); - } - else - { - this_l->node->logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - this_l->failure (connection->address); - } - }); - } - else - { - this_l->node->logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - this_l->failure (connection->address); - } - }); + } + else if (ec == boost::system::errc::operation_canceled) + { + // The only case where we send a cancel is if we preempt stopped waiting for the response + this_l->cancel (connection); + this_l->failure (connection->address); + } + else + { + this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); + this_l->failure (connection->address); + } + }); + } + else + { + this_l->node->logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); + this_l->failure (connection->address); + } + }); + } + else + { + this_l->node->logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); + this_l->failure (connection->address); + } }); } } @@ -1216,27 +1214,30 @@ class distributed_work : public std::enable_shared_from_this { if (last) { - node->unresponsive_work_peers = true; - if (!completed && !local_generation_started) + if (!completed) { - if (backoff == 1 && node->config.logging.work_generation_time ()) + node->unresponsive_work_peers = true; + if (!local_generation_started) { - node->logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); - } - auto now (std::chrono::steady_clock::now ()); - auto root_l (root); - auto callback_l (callback); - std::weak_ptr node_w (node); - auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); - // clang-format off - node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] { - if (auto node_l = node_w.lock ()) + if (backoff == 1 && node->config.logging.work_generation_time ()) { - auto work_generation (std::make_shared (next_backoff, node_l, root_l, callback_l, difficulty)); - work_generation->start (); + node->logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); } - }); - // clang-format on + auto now (std::chrono::steady_clock::now ()); + auto root_l (root); + auto callback_l (callback); + std::weak_ptr node_w (node); + auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); + // clang-format off + node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] { + if (auto node_l = node_w.lock ()) + { + auto work_generation (std::make_shared (next_backoff, node_l, root_l, callback_l, difficulty)); + work_generation->start (); + } + }); + // clang-format on + } } } } @@ -1246,11 +1247,6 @@ class distributed_work : public std::enable_shared_from_this outstanding.erase (address); return outstanding.empty (); } - void add (std::shared_ptr & connection) - { - std::lock_guard lock (mutex); - connections.push_back (connection); - } std::function callback; unsigned int backoff; // in seconds std::shared_ptr node;