Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spinlock #36030

Merged
merged 6 commits into from
Sep 29, 2021
Merged

Spinlock #36030

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions paddle/fluid/framework/new_executor/run_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include <cstdint>
#include <mutex>
#include <vector>
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -101,7 +103,7 @@ class RunQueue {
// PushBack adds w at the end of the queue.
// If queue is full returns w, otherwise returns default-constructed Work.
Work PushBack(Work w) {
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<paddle::memory::SpinLock> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[(back - 1) & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
Expand All @@ -123,7 +125,7 @@ class RunQueue {
return Work();
}

std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<paddle::memory::SpinLock> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[back & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
Expand All @@ -145,7 +147,7 @@ class RunQueue {
return 0;
}

std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<paddle::memory::SpinLock> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
unsigned size = Size();
unsigned mid = back;
Expand Down Expand Up @@ -213,7 +215,7 @@ class RunQueue {
// modification counters.
alignas(64) std::atomic<unsigned> front_;
alignas(64) std::atomic<unsigned> back_;
std::mutex mutex_;
paddle::memory::SpinLock mutex_;
Elem array_[kSize];

// SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/framework/new_executor/workqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ std::unique_ptr<WorkQueue> CreateMultiThreadedWorkQueue(
"WorkQueueOptions.num_threads must be "
"greater than 1."));
std::unique_ptr<WorkQueue> ptr(new WorkQueueImpl(options));
return ptr;
return std::move(ptr);
}

std::unique_ptr<WorkQueueGroup> CreateWorkQueueGroup(
Expand All @@ -176,7 +176,7 @@ std::unique_ptr<WorkQueueGroup> CreateWorkQueueGroup(
"For a WorkQueueGroup, the number of WorkQueueOptions "
"must be greater than 1."));
std::unique_ptr<WorkQueueGroup> ptr(new WorkQueueGroupImpl(queues_options));
return ptr;
return std::move(ptr);
}

} // namespace framework
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/new_executor/workqueue_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdlib>
Expand Down
43 changes: 27 additions & 16 deletions paddle/fluid/memory/allocation/spin_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,48 @@
#pragma once

#include <atomic>
#if !defined(_WIN32)
#include <sched.h>
#else
#include <windows.h>
#endif // !_WIN32
#if defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || \
defined(__i386__)
#define __PADDLE_x86__
#include <immintrin.h>
#endif
#include <thread>

#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace memory {
static inline void CpuRelax() {
#if defined(__PADDLE_x86__)
_mm_pause();
#endif
}

class SpinLock {
public:
SpinLock() : mlock_(false) {}

void lock() {
bool expect = false;
uint64_t spin_cnt = 0;
while (!mlock_.compare_exchange_weak(expect, true)) {
expect = false;
if ((++spin_cnt & 0xFF) == 0) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
for (;;) {
if (!mlock_.exchange(true, std::memory_order_acquire)) {
break;
}
constexpr int kMaxLoop = 32;
for (int loop = 1; mlock_.load(std::memory_order_relaxed);) {
if (loop <= kMaxLoop) {
for (int i = 1; i <= loop; ++i) {
CpuRelax();
}
loop *= 2;
} else {
std::this_thread::yield();
}
}
}
}

void unlock() { mlock_.store(false); }
void unlock() { mlock_.store(false, std::memory_order_release); }

DISABLE_COPY_AND_ASSIGN(SpinLock);

private:
Expand Down