Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements to OpenCL work handling #2247

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nano/core_test/work_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ TEST (work, opencl)
auto opencl (nano::opencl_work::create (true, { 0, 0, 16 * 1024 }, logger));
if (opencl != nullptr)
{
nano::work_pool pool (std::numeric_limits<unsigned>::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) {
nano::work_pool pool (std::numeric_limits<unsigned>::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic<int> & ticket_a) {
return opencl->generate_work (root_a, difficulty_a);
}
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> (nullptr));
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> & ticket_a)> (nullptr));
ASSERT_NE (nullptr, pool.opencl);
nano::uint256_union root;
uint64_t difficulty (0xff00000000000000);
Expand Down
75 changes: 41 additions & 34 deletions nano/lib/work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ uint64_t nano::work_value (nano::block_hash const & root_a, uint64_t work_a)
return result;
}

nano::work_pool::work_pool (unsigned max_threads_a, std::chrono::nanoseconds pow_rate_limiter_a, std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> opencl_a) :
nano::work_pool::work_pool (unsigned max_threads_a, std::chrono::nanoseconds pow_rate_limiter_a, std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> opencl_a) :
ticket (0),
done (false),
pow_rate_limiter (pow_rate_limiter_a),
Expand All @@ -42,6 +42,11 @@ opencl (opencl_a)
boost::thread::attributes attrs;
nano::thread_attributes::set (attrs);
auto count (network_constants.is_test_network () ? 1 : std::min (max_threads_a, std::max (1u, boost::thread::hardware_concurrency ())));
if (opencl)
{
// One thread to handle OpenCL
++count;
}
for (auto i (0u); i < count; ++i)
{
auto thread (boost::thread (attrs, [this, i]() {
Expand Down Expand Up @@ -87,27 +92,40 @@ void nano::work_pool::loop (uint64_t thread)
int ticket_l (ticket);
lock.unlock ();
output = 0;
// ticket != ticket_l indicates a different thread found a solution and we should stop
while (ticket == ticket_l && output < current_l.difficulty)
boost::optional<uint64_t> opt_work;
if (thread == 0 && opencl)
{
// Don't query main memory every iteration in order to reduce memory bus traffic
// All operations here operate on stack memory
// Count iterations down to zero since comparing to zero is easier than comparing to another number
unsigned iteration (256);
while (iteration && output < current_l.difficulty)
{
work = rng.next ();
blake2b_update (&hash, reinterpret_cast<uint8_t *> (&work), sizeof (work));
blake2b_update (&hash, current_l.item.bytes.data (), current_l.item.bytes.size ());
blake2b_final (&hash, reinterpret_cast<uint8_t *> (&output), sizeof (output));
blake2b_init (&hash, sizeof (output));
iteration -= 1;
}

// Add a rate limiter (if specified) to the pow calculation to save some CPUs which don't want to operate at full throttle
if (pow_sleep != std::chrono::nanoseconds (0))
opt_work = opencl (current_l.item, current_l.difficulty, ticket);
}
if (opt_work.is_initialized ())
{
work = *opt_work;
output = work_value (current_l.item, work);
}
else
{
// ticket != ticket_l indicates a different thread found a solution and we should stop
while (ticket == ticket_l && output < current_l.difficulty)
{
std::this_thread::sleep_for (pow_sleep);
// Don't query main memory every iteration in order to reduce memory bus traffic
// All operations here operate on stack memory
// Count iterations down to zero since comparing to zero is easier than comparing to another number
unsigned iteration (256);
while (iteration && output < current_l.difficulty)
{
work = rng.next ();
blake2b_update (&hash, reinterpret_cast<uint8_t *> (&work), sizeof (work));
blake2b_update (&hash, current_l.item.bytes.data (), current_l.item.bytes.size ());
blake2b_final (&hash, reinterpret_cast<uint8_t *> (&output), sizeof (output));
blake2b_init (&hash, sizeof (output));
iteration -= 1;
}

// Add a rate limiter (if specified) to the pow calculation to save some CPUs which don't want to operate at full throttle
if (pow_sleep != std::chrono::nanoseconds (0))
{
std::this_thread::sleep_for (pow_sleep);
}
}
}
lock.lock ();
Expand Down Expand Up @@ -183,22 +201,11 @@ void nano::work_pool::generate (nano::uint256_union const & hash_a, std::functio
{
assert (!hash_a.is_zero ());
boost::optional<uint64_t> result;
if (opencl)
{
result = opencl (hash_a, difficulty_a);
}
if (!result)
{
{
std::lock_guard<std::mutex> lock (mutex);
pending.push_back ({ hash_a, callback_a, difficulty_a });
}
producer_condition.notify_all ();
}
else
{
callback_a (result);
std::lock_guard<std::mutex> lock (mutex);
pending.push_back ({ hash_a, callback_a, difficulty_a });
}
producer_condition.notify_all ();
}

uint64_t nano::work_pool::generate (nano::uint256_union const & hash_a)
Expand Down
4 changes: 2 additions & 2 deletions nano/lib/work.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class work_item final
class work_pool final
{
public:
work_pool (unsigned, std::chrono::nanoseconds = std::chrono::nanoseconds (0), std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> = nullptr);
work_pool (unsigned, std::chrono::nanoseconds = std::chrono::nanoseconds (0), std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> = nullptr);
~work_pool ();
void loop (uint64_t);
void stop ();
Expand All @@ -46,7 +46,7 @@ class work_pool final
std::mutex mutex;
std::condition_variable producer_condition;
std::chrono::nanoseconds pow_rate_limiter;
std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> opencl;
std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> opencl;
nano::observer_set<bool> work_observers;
};

Expand Down
6 changes: 3 additions & 3 deletions nano/nano_node/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano::
nano::logger_mt logger{ config.node.logging.min_time_between_log_output };
boost::asio::io_context io_ctx;
auto opencl (nano::opencl_work::create (config.opencl_enable, config.opencl, logger));
nano::work_pool opencl_work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) {
return opencl->generate_work (root_a, difficulty_a);
nano::work_pool opencl_work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic<int> & ticket_a) {
return opencl->generate_work (root_a, difficulty_a, ticket_a);
}
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> (nullptr));
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> (nullptr));
nano::alarm alarm (io_ctx);
nano::node_init init;
try
Expand Down
4 changes: 2 additions & 2 deletions nano/nano_node/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,10 @@ int main (int argc, char * const * argv)
{
nano::logger_mt logger;
auto opencl (nano::opencl_work::create (true, { platform, device, threads }, logger));
nano::work_pool work_pool (std::numeric_limits<unsigned>::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) {
nano::work_pool work_pool (std::numeric_limits<unsigned>::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic<int> &) {
return opencl->generate_work (root_a, difficulty_a);
}
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> (nullptr));
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> (nullptr));
nano::change_block block (0, 0, nano::keypair ().prv, 0, 0);
std::cerr << boost::str (boost::format ("Starting OpenCL generation profiling. Platform: %1%. Device: %2%. Threads: %3%. Difficulty: %4$#x\n") % platform % device % threads % difficulty);
for (uint64_t i (0); true; ++i)
Expand Down
4 changes: 2 additions & 2 deletions nano/nano_wallet/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
std::shared_ptr<nano_qt::wallet> gui;
nano::set_application_icon (application);
auto opencl (nano::opencl_work::create (config.opencl_enable, config.opencl, logger));
nano::work_pool work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) {
nano::work_pool work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic<int> &) {
return opencl->generate_work (root_a, difficulty_a);
}
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> (nullptr));
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> (nullptr));
nano::alarm alarm (io_ctx);
nano::node_init init;
nano::node_flags flags;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ startup_time (std::chrono::steady_clock::now ())
auto network_label = network_params.network.get_current_network_as_string ();
logger.always_log ("Active network: ", network_label);

logger.always_log (boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ()));
logger.always_log (boost::str (boost::format ("Work pool running %1% threads %2%") % work.threads.size () % (work.opencl ? "(1 for OpenCL)" : "")));
logger.always_log (boost::str (boost::format ("%1% work peers configured") % config.work_peers.size ()));
if (config.work_peers.empty () && config.work_threads == 0 && !work.opencl)
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class node_config
static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60);
static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5;
static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5);
size_t bandwidth_limit{ 5 * 1024 * 1024 }; // 5Mb/s
size_t bandwidth_limit{ 5 * 1024 * 1024 }; // 5MB/s
std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 };
bool backup_before_upgrade{ false };
std::chrono::seconds work_watcher_period{ std::chrono::seconds (5) };
Expand Down
9 changes: 8 additions & 1 deletion nano/node/openclwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,21 @@ nano::opencl_work::~opencl_work ()
}

boost::optional<uint64_t> nano::opencl_work::generate_work (nano::uint256_union const & root_a, uint64_t const difficulty_a)
{
std::atomic<int> ticket_l{ 0 };
return generate_work (root_a, difficulty_a, ticket_l);
}

boost::optional<uint64_t> nano::opencl_work::generate_work (nano::uint256_union const & root_a, uint64_t const difficulty_a, std::atomic<int> & ticket_a)
{
std::lock_guard<std::mutex> lock (mutex);
bool error (false);
int ticket_l (ticket_a);
uint64_t result (0);
uint64_t computed_difficulty (0);
unsigned thread_count (config.threads);
size_t work_size[] = { thread_count, 0, 0 };
while ((nano::work_validate (root_a, result, &computed_difficulty) || computed_difficulty < difficulty_a) && !error)
while ((nano::work_validate (root_a, result, &computed_difficulty) || computed_difficulty < difficulty_a) && !error && ticket_a == ticket_l)
{
result = rand.next ();
cl_int write_error1 = clEnqueueWriteBuffer (queue, attempt_buffer, false, 0, sizeof (uint64_t), &result, 0, nullptr, nullptr);
Expand Down
1 change: 1 addition & 0 deletions nano/node/openclwork.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class opencl_work
opencl_work (bool &, nano::opencl_config const &, nano::opencl_environment &, nano::logger_mt &);
~opencl_work ();
boost::optional<uint64_t> generate_work (nano::uint256_union const &, uint64_t const);
boost::optional<uint64_t> generate_work (nano::uint256_union const &, uint64_t const, std::atomic<int> &);
static std::unique_ptr<opencl_work> create (bool, nano::opencl_config const &, nano::logger_mt &);
nano::opencl_config const & config;
std::mutex mutex;
Expand Down