From 8744ba7295c255b9a10d78d3c6da565c733a9fce Mon Sep 17 00:00:00 2001 From: liutiexing Date: Wed, 22 Sep 2021 20:09:22 +0000 Subject: [PATCH 1/3] add align for WorkQueue --- .../framework/new_executor/CMakeLists.txt | 2 +- .../framework/new_executor/event_count.h | 5 +- .../new_executor/nonblocking_threadpool.h | 16 ++--- .../fluid/framework/new_executor/run_queue.h | 2 +- .../fluid/framework/new_executor/workqueue.cc | 16 +++-- .../framework/new_executor/workqueue_utils.cc | 59 +++++++++++++++++++ .../framework/new_executor/workqueue_utils.h | 4 ++ 7 files changed, 88 insertions(+), 16 deletions(-) create mode 100644 paddle/fluid/framework/new_executor/workqueue_utils.cc diff --git a/paddle/fluid/framework/new_executor/CMakeLists.txt b/paddle/fluid/framework/new_executor/CMakeLists.txt index f66d743620b45..365083a34782a 100644 --- a/paddle/fluid/framework/new_executor/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/CMakeLists.txt @@ -2,7 +2,7 @@ set(INTERPRETERCORE_DEPS op_registry device_context scope framework_proto data_f lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method graph_to_program_pass variable_helper timer monitor) -cc_library(workqueue SRCS workqueue.cc DEPS enforce) +cc_library(workqueue SRCS workqueue.cc workqueue_utils.cc DEPS enforce) cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS}) cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS} workqueue) cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog) diff --git a/paddle/fluid/framework/new_executor/event_count.h b/paddle/fluid/framework/new_executor/event_count.h index f374456ca3814..0c6d49042d22d 100644 --- a/paddle/fluid/framework/new_executor/event_count.h +++ b/paddle/fluid/framework/new_executor/event_count.h @@ -50,6 +50,7 @@ #include #include #include +#include "paddle/fluid/framework/new_executor/workqueue_utils.h" namespace paddle { namespace framework { @@ -60,7 +61,7 @@ class EventCount { explicit EventCount(size_t waiter_num) : state_(kStackMask) { assert(waiter_num < (1 << kWaiterBits) - 1); - void* buffer = malloc(sizeof(Waiter) * waiter_num); + void* buffer = AlignedMalloc(sizeof(Waiter) * waiter_num, alignof(Waiter)); if (buffer == nullptr) { return; } @@ -78,7 +79,7 @@ class EventCount { ~EventCount() { // Ensure there are no waiters. assert(state_.load() == kStackMask); - free(waiters_); + AlignedFree(waiters_); } Waiter* GetWaiter(size_t waiter_index) { diff --git a/paddle/fluid/framework/new_executor/nonblocking_threadpool.h b/paddle/fluid/framework/new_executor/nonblocking_threadpool.h index 56edcecd17f37..56a036527a56b 100644 --- a/paddle/fluid/framework/new_executor/nonblocking_threadpool.h +++ b/paddle/fluid/framework/new_executor/nonblocking_threadpool.h @@ -56,9 +56,9 @@ class TaskTracker { } private: - std::atomic num_tasks_{0}; - EventCount wait_empty_cv_; - std::atomic wait_empty_{false}; + alignas(64) std::atomic num_tasks_{0}; + alignas(64) EventCount wait_empty_cv_; + alignas(64) std::atomic wait_empty_{false}; }; template @@ -70,15 +70,15 @@ class ThreadPoolTempl { ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment()) : env_(env), - num_threads_(num_threads), allow_spinning_(allow_spinning), - thread_data_(num_threads), global_steal_partition_(EncodePartition(0, num_threads_)), blocked_(0), spinning_(0), done_(false), cancelled_(false), - ec_(num_threads_) { + ec_(num_threads), + num_threads_(num_threads), + thread_data_(num_threads) { // Calculate coprimes of all numbers [1, num_threads]. // Coprimes are used for random walks over all threads in Steal // and NonEmptyQueueIndex. Iteration is based on the fact that if we take @@ -259,9 +259,7 @@ class ThreadPoolTempl { }; Environment env_; - const int num_threads_; const bool allow_spinning_; - std::vector thread_data_; std::vector> all_coprimes_; unsigned global_steal_partition_; std::atomic blocked_; @@ -269,6 +267,8 @@ class ThreadPoolTempl { std::atomic done_; std::atomic cancelled_; EventCount ec_; + const int num_threads_; + std::vector thread_data_; // Main worker thread loop. void WorkerLoop(int thread_id) { diff --git a/paddle/fluid/framework/new_executor/run_queue.h b/paddle/fluid/framework/new_executor/run_queue.h index 707aadd315885..13035237ff8b4 100644 --- a/paddle/fluid/framework/new_executor/run_queue.h +++ b/paddle/fluid/framework/new_executor/run_queue.h @@ -204,7 +204,6 @@ class RunQueue { kReady, }; - std::mutex mutex_; // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of // front/back, respectively. The remaining bits contain modification counters // that are incremented on Push operations. This allows us to (1) distinguish @@ -214,6 +213,7 @@ class RunQueue { // modification counters. alignas(64) std::atomic front_; alignas(64) std::atomic back_; + std::mutex mutex_; Elem array_[kSize]; // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, diff --git a/paddle/fluid/framework/new_executor/workqueue.cc b/paddle/fluid/framework/new_executor/workqueue.cc index 184d9d6998464..a9dfddd689669 100644 --- a/paddle/fluid/framework/new_executor/workqueue.cc +++ b/paddle/fluid/framework/new_executor/workqueue.cc @@ -18,14 +18,18 @@ class WorkQueueImpl : public WorkQueue { explicit WorkQueueImpl(const WorkQueueOptions& options) : WorkQueue(options), queue_(nullptr), tracker_(nullptr) { if (options_.track_task) { - tracker_ = new TaskTracker; + void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker)); + tracker_ = new (storage) TaskTracker; } queue_ = new NonblockingThreadPool(options_.num_threads, options_.allow_spinning); } virtual ~WorkQueueImpl() { - delete tracker_; + if (tracker_ != nullptr) { + tracker_->~TaskTracker(); + AlignedFree(tracker_); + } delete queue_; } @@ -89,7 +93,8 @@ WorkQueueGroupImpl::WorkQueueGroupImpl( for (size_t idx = 0; idx < num_queues; ++idx) { const auto& options = queues_options_[idx]; if (options.track_task && tracker_ == nullptr) { - tracker_ = new TaskTracker; + void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker)); + tracker_ = new (storage) TaskTracker; } queues_[idx] = new (&queues_storage_[idx]) NonblockingThreadPool(options.num_threads, options.allow_spinning); @@ -100,7 +105,10 @@ WorkQueueGroupImpl::~WorkQueueGroupImpl() { for (auto queue : queues_) { queue->~NonblockingThreadPool(); } - delete tracker_; + if (tracker_ != nullptr) { + tracker_->~TaskTracker(); + AlignedFree(tracker_); + } free(queues_storage_); } diff --git a/paddle/fluid/framework/new_executor/workqueue_utils.cc b/paddle/fluid/framework/new_executor/workqueue_utils.cc new file mode 100644 index 0000000000000..2ea49e676a807 --- /dev/null +++ b/paddle/fluid/framework/new_executor/workqueue_utils.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/new_executor/workqueue_utils.h" +#include +#include + +namespace paddle { +namespace framework { + +void* AlignedMalloc(size_t size, size_t alignment) { + assert(alignment >= sizeof(void*) && (alignment & (alignment - 1)) == 0); + size = (size + alignment - 1) / alignment * alignment; +#if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L + void* aligned_mem = nullptr; + if (posix_memalign(&aligned_mem, alignment, size) != 0) { + aligned_mem = nullptr; + } + return aligned_mem; +#elif defined(_WIN32) + return _aligned_malloc(size, alignment); +#else + void* mem = malloc(size + alignment); + if (mem == nullptr) { + return nullptr; + } + size_t adjust = alignment - reinterpret_cast(mem) % alignment; + void* aligned_mem = reinterpret_cast(mem) + adjust; + *(reinterpret_cast(aligned_mem) - 1) = mem; + assert(reinterpret_cast(aligned_mem) % alignment == 0); + return aligned_mem; +#endif +} + +void AlignedFree(void* mem_ptr) { +#if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L + free(mem_ptr); +#elif defined(_WIN32) + _aligned_free(mem_ptr); +#else + if (mem_ptr) { + free(*(reinterpret_cast(mem_ptr) - 1)); + } +#endif +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/workqueue_utils.h b/paddle/fluid/framework/new_executor/workqueue_utils.h index 00183eadcbb5c..6907f2f17da0d 100644 --- a/paddle/fluid/framework/new_executor/workqueue_utils.h +++ b/paddle/fluid/framework/new_executor/workqueue_utils.h @@ -59,5 +59,9 @@ class CounterGuard { Holder* counter_holder_{nullptr}; }; +void* AlignedMalloc(size_t size, size_t alignment); + +void AlignedFree(void* memory_ptr); + } // namespace framework } // namespace paddle From 6f00ace9b1b5c5d45b5843472b5eeb4ce1602509 Mon Sep 17 00:00:00 2001 From: liutiexing Date: Thu, 23 Sep 2021 13:44:55 +0000 Subject: [PATCH 2/3] add spinlock --- .../fluid/framework/new_executor/run_queue.h | 9 +++-- .../fluid/framework/new_executor/workqueue.cc | 4 +- .../framework/new_executor/workqueue_utils.h | 40 +++++++++++++++++++ 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/framework/new_executor/run_queue.h b/paddle/fluid/framework/new_executor/run_queue.h index 13035237ff8b4..1d32eb05d4d51 100644 --- a/paddle/fluid/framework/new_executor/run_queue.h +++ b/paddle/fluid/framework/new_executor/run_queue.h @@ -37,6 +37,7 @@ #include #include #include +#include "paddle/fluid/framework/new_executor/workqueue_utils.h" namespace paddle { namespace framework { @@ -101,7 +102,7 @@ class RunQueue { // PushBack adds w at the end of the queue. // If queue is full returns w, otherwise returns default-constructed Work. Work PushBack(Work w) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[(back - 1) & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -123,7 +124,7 @@ class RunQueue { return Work(); } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[back & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -145,7 +146,7 @@ class RunQueue { return 0; } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); unsigned size = Size(); unsigned mid = back; @@ -213,7 +214,7 @@ class RunQueue { // modification counters. alignas(64) std::atomic front_; alignas(64) std::atomic back_; - std::mutex mutex_; + SpinLock mutex_; Elem array_[kSize]; // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, diff --git a/paddle/fluid/framework/new_executor/workqueue.cc b/paddle/fluid/framework/new_executor/workqueue.cc index bc5a4e27dc528..8c6eeab4d5c0a 100644 --- a/paddle/fluid/framework/new_executor/workqueue.cc +++ b/paddle/fluid/framework/new_executor/workqueue.cc @@ -166,7 +166,7 @@ std::unique_ptr CreateMultiThreadedWorkQueue( "WorkQueueOptions.num_threads must be " "greater than 1.")); std::unique_ptr ptr(new WorkQueueImpl(options)); - return ptr; + return std::move(ptr); } std::unique_ptr CreateWorkQueueGroup( @@ -176,7 +176,7 @@ std::unique_ptr CreateWorkQueueGroup( "For a WorkQueueGroup, the number of WorkQueueOptions " "must be greater than 1.")); std::unique_ptr ptr(new WorkQueueGroupImpl(queues_options)); - return ptr; + return std::move(ptr); } } // namespace framework diff --git a/paddle/fluid/framework/new_executor/workqueue_utils.h b/paddle/fluid/framework/new_executor/workqueue_utils.h index 6907f2f17da0d..1aab4529b023c 100644 --- a/paddle/fluid/framework/new_executor/workqueue_utils.h +++ b/paddle/fluid/framework/new_executor/workqueue_utils.h @@ -14,9 +14,16 @@ #pragma once +#include #include #include #include +#if defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || \ + defined(__i386__) +#define __WQ_x86__ +#include +#endif +#include #include "paddle/fluid/platform/enforce.h" namespace paddle { @@ -63,5 +70,38 @@ void* AlignedMalloc(size_t size, size_t alignment); void AlignedFree(void* memory_ptr); +static inline void CpuRelax() { +#if defined(__WQ_x86__) + _mm_pause(); +#endif +} + +class SpinLock { + public: + void lock() { + for (;;) { + if (!lock_.exchange(true, std::memory_order_acquire)) { + break; + } + constexpr int kMaxLoop = 32; + for (int loop = 1; lock_.load(std::memory_order_relaxed);) { + if (loop <= kMaxLoop) { + for (int i = 1; i <= loop; ++i) { + CpuRelax(); + } + loop *= 2; + } else { + std::this_thread::yield(); + } + } + } + } + + void unlock() { lock_.store(false, std::memory_order_release); } + + private: + std::atomic lock_{false}; +}; + } // namespace framework } // namespace paddle From d93753c5f01117737b0093ac07c517dba0a508a9 Mon Sep 17 00:00:00 2001 From: liutiexing Date: Sun, 26 Sep 2021 08:13:55 +0000 Subject: [PATCH 3/3] merge spinlock --- .../fluid/framework/new_executor/run_queue.h | 9 ++-- .../framework/new_executor/workqueue_utils.h | 39 ----------------- paddle/fluid/memory/allocation/spin_lock.h | 43 ++++++++++++------- 3 files changed, 32 insertions(+), 59 deletions(-) diff --git a/paddle/fluid/framework/new_executor/run_queue.h b/paddle/fluid/framework/new_executor/run_queue.h index 1d32eb05d4d51..e457b20a3c35d 100644 --- a/paddle/fluid/framework/new_executor/run_queue.h +++ b/paddle/fluid/framework/new_executor/run_queue.h @@ -38,6 +38,7 @@ #include #include #include "paddle/fluid/framework/new_executor/workqueue_utils.h" +#include "paddle/fluid/memory/allocation/spin_lock.h" namespace paddle { namespace framework { @@ -102,7 +103,7 @@ class RunQueue { // PushBack adds w at the end of the queue. // If queue is full returns w, otherwise returns default-constructed Work. Work PushBack(Work w) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[(back - 1) & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -124,7 +125,7 @@ class RunQueue { return Work(); } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[back & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -146,7 +147,7 @@ class RunQueue { return 0; } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); unsigned size = Size(); unsigned mid = back; @@ -214,7 +215,7 @@ class RunQueue { // modification counters. alignas(64) std::atomic front_; alignas(64) std::atomic back_; - SpinLock mutex_; + paddle::memory::SpinLock mutex_; Elem array_[kSize]; // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, diff --git a/paddle/fluid/framework/new_executor/workqueue_utils.h b/paddle/fluid/framework/new_executor/workqueue_utils.h index 1aab4529b023c..bb219fea36267 100644 --- a/paddle/fluid/framework/new_executor/workqueue_utils.h +++ b/paddle/fluid/framework/new_executor/workqueue_utils.h @@ -18,12 +18,6 @@ #include #include #include -#if defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || \ - defined(__i386__) -#define __WQ_x86__ -#include -#endif -#include #include "paddle/fluid/platform/enforce.h" namespace paddle { @@ -70,38 +64,5 @@ void* AlignedMalloc(size_t size, size_t alignment); void AlignedFree(void* memory_ptr); -static inline void CpuRelax() { -#if defined(__WQ_x86__) - _mm_pause(); -#endif -} - -class SpinLock { - public: - void lock() { - for (;;) { - if (!lock_.exchange(true, std::memory_order_acquire)) { - break; - } - constexpr int kMaxLoop = 32; - for (int loop = 1; lock_.load(std::memory_order_relaxed);) { - if (loop <= kMaxLoop) { - for (int i = 1; i <= loop; ++i) { - CpuRelax(); - } - loop *= 2; - } else { - std::this_thread::yield(); - } - } - } - } - - void unlock() { lock_.store(false, std::memory_order_release); } - - private: - std::atomic lock_{false}; -}; - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/memory/allocation/spin_lock.h b/paddle/fluid/memory/allocation/spin_lock.h index 42462fd74b4cd..2bbe340e7c691 100644 --- a/paddle/fluid/memory/allocation/spin_lock.h +++ b/paddle/fluid/memory/allocation/spin_lock.h @@ -15,37 +15,48 @@ #pragma once #include -#if !defined(_WIN32) -#include -#else -#include -#endif // !_WIN32 +#if defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || \ + defined(__i386__) +#define __PADDLE_x86__ +#include +#endif +#include #include "paddle/fluid/platform/macros.h" namespace paddle { namespace memory { +static inline void CpuRelax() { +#if defined(__PADDLE_x86__) + _mm_pause(); +#endif +} class SpinLock { public: SpinLock() : mlock_(false) {} void lock() { - bool expect = false; - uint64_t spin_cnt = 0; - while (!mlock_.compare_exchange_weak(expect, true)) { - expect = false; - if ((++spin_cnt & 0xFF) == 0) { -#if defined(_WIN32) - SleepEx(50, FALSE); -#else - sched_yield(); -#endif + for (;;) { + if (!mlock_.exchange(true, std::memory_order_acquire)) { + break; + } + constexpr int kMaxLoop = 32; + for (int loop = 1; mlock_.load(std::memory_order_relaxed);) { + if (loop <= kMaxLoop) { + for (int i = 1; i <= loop; ++i) { + CpuRelax(); + } + loop *= 2; + } else { + std::this_thread::yield(); + } } } } - void unlock() { mlock_.store(false); } + void unlock() { mlock_.store(false, std::memory_order_release); } + DISABLE_COPY_AND_ASSIGN(SpinLock); private: