forked from baidu/braft
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix-peers-change-failed-when-cluster-restart-in-joint-status
- Loading branch information
Showing
5 changed files
with
466 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | ||
// This source code is licensed under both the GPLv2 (found in the | ||
// COPYING file in the root directory) and Apache 2.0 License | ||
// (found in the LICENSE.Apache file in the root directory). | ||
|
||
#include "sync_point.h" | ||
|
||
#include <atomic> | ||
#include <condition_variable> | ||
#include <functional> | ||
#include <mutex> | ||
#include <string> | ||
#include <thread> | ||
#include <unordered_map> | ||
#include <unordered_set> | ||
|
||
#include <fcntl.h> | ||
|
||
#ifndef NDEBUG | ||
namespace braft { | ||
|
||
struct SyncPoint::Data { | ||
Data() : enabled_(false) {} | ||
// Enable proper deletion by subclasses | ||
virtual ~Data() {} | ||
// successor/predecessor map loaded from LoadDependency | ||
std::unordered_map<std::string, std::vector<std::string>> successors_; | ||
std::unordered_map<std::string, std::vector<std::string>> predecessors_; | ||
std::unordered_map<std::string, std::function<void(void *)>> callbacks_; | ||
std::unordered_map<std::string, std::vector<std::string>> markers_; | ||
std::unordered_map<std::string, std::thread::id> marked_thread_id_; | ||
|
||
std::mutex mutex_; | ||
std::condition_variable cv_; | ||
// sync points that have been passed through | ||
std::unordered_set<std::string> cleared_points_; | ||
std::atomic<bool> enabled_; | ||
int num_callbacks_running_ = 0; | ||
|
||
void LoadDependency(const std::vector<SyncPointPair> &dependencies); | ||
void LoadDependencyAndMarkers(const std::vector<SyncPointPair> &dependencies, | ||
const std::vector<SyncPointPair> &markers); | ||
bool PredecessorsAllCleared(const std::string &point); | ||
void SetCallBack(const std::string &point, | ||
const std::function<void(void *)> &callback) { | ||
std::lock_guard<std::mutex> lock(mutex_); | ||
callbacks_[point] = callback; | ||
} | ||
|
||
void ClearCallBack(const std::string &point); | ||
void ClearAllCallBacks(); | ||
void EnableProcessing() { enabled_ = true; } | ||
void DisableProcessing() { enabled_ = false; } | ||
void ClearTrace() { | ||
std::lock_guard<std::mutex> lock(mutex_); | ||
cleared_points_.clear(); | ||
} | ||
bool DisabledByMarker(const std::string &point, std::thread::id thread_id) { | ||
auto marked_point_iter = marked_thread_id_.find(point); | ||
return marked_point_iter != marked_thread_id_.end() && | ||
thread_id != marked_point_iter->second; | ||
} | ||
void Process(const std::string &point, void *cb_arg); | ||
}; | ||
|
||
SyncPoint *SyncPoint::GetInstance() { | ||
static SyncPoint sync_point; | ||
return &sync_point; | ||
} | ||
|
||
SyncPoint::SyncPoint() : impl_(new Data) {} | ||
|
||
SyncPoint::~SyncPoint() { delete impl_; } | ||
|
||
void SyncPoint::LoadDependency(const std::vector<SyncPointPair> &dependencies) { | ||
impl_->LoadDependency(dependencies); | ||
} | ||
|
||
void SyncPoint::LoadDependencyAndMarkers( | ||
const std::vector<SyncPointPair> &dependencies, | ||
const std::vector<SyncPointPair> &markers) { | ||
impl_->LoadDependencyAndMarkers(dependencies, markers); | ||
} | ||
|
||
void SyncPoint::SetCallBack(const std::string &point, | ||
const std::function<void(void *)> &callback) { | ||
impl_->SetCallBack(point, callback); | ||
} | ||
|
||
void SyncPoint::ClearCallBack(const std::string &point) { | ||
impl_->ClearCallBack(point); | ||
} | ||
|
||
void SyncPoint::ClearAllCallBacks() { impl_->ClearAllCallBacks(); } | ||
|
||
void SyncPoint::EnableProcessing() { impl_->EnableProcessing(); } | ||
|
||
void SyncPoint::DisableProcessing() { impl_->DisableProcessing(); } | ||
|
||
void SyncPoint::ClearTrace() { impl_->ClearTrace(); } | ||
|
||
void SyncPoint::Process(const std::string &point, void *cb_arg) { | ||
impl_->Process(point, cb_arg); | ||
} | ||
|
||
void SyncPoint::Data::LoadDependency( | ||
const std::vector<SyncPointPair> &dependencies) { | ||
std::lock_guard<std::mutex> lock(mutex_); | ||
successors_.clear(); | ||
predecessors_.clear(); | ||
cleared_points_.clear(); | ||
for (const auto &dependency : dependencies) { | ||
successors_[dependency.predecessor].push_back(dependency.successor); | ||
predecessors_[dependency.successor].push_back(dependency.predecessor); | ||
} | ||
cv_.notify_all(); | ||
} | ||
|
||
void SyncPoint::Data::LoadDependencyAndMarkers( | ||
const std::vector<SyncPointPair> &dependencies, | ||
const std::vector<SyncPointPair> &markers) { | ||
std::lock_guard<std::mutex> lock(mutex_); | ||
successors_.clear(); | ||
predecessors_.clear(); | ||
cleared_points_.clear(); | ||
markers_.clear(); | ||
marked_thread_id_.clear(); | ||
for (const auto &dependency : dependencies) { | ||
successors_[dependency.predecessor].push_back(dependency.successor); | ||
predecessors_[dependency.successor].push_back(dependency.predecessor); | ||
} | ||
for (const auto &marker : markers) { | ||
successors_[marker.predecessor].push_back(marker.successor); | ||
predecessors_[marker.successor].push_back(marker.predecessor); | ||
markers_[marker.predecessor].push_back(marker.successor); | ||
} | ||
cv_.notify_all(); | ||
} | ||
|
||
bool SyncPoint::Data::PredecessorsAllCleared(const std::string &point) { | ||
for (const auto &pred : predecessors_[point]) { | ||
if (cleared_points_.count(pred) == 0) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
void SyncPoint::Data::ClearCallBack(const std::string &point) { | ||
std::unique_lock<std::mutex> lock(mutex_); | ||
while (num_callbacks_running_ > 0) { | ||
cv_.wait(lock); | ||
} | ||
callbacks_.erase(point); | ||
} | ||
|
||
void SyncPoint::Data::ClearAllCallBacks() { | ||
std::unique_lock<std::mutex> lock(mutex_); | ||
while (num_callbacks_running_ > 0) { | ||
cv_.wait(lock); | ||
} | ||
callbacks_.clear(); | ||
} | ||
|
||
void SyncPoint::Data::Process(const std::string &point, void *cb_arg) { | ||
if (!enabled_) { | ||
return; | ||
} | ||
|
||
std::unique_lock<std::mutex> lock(mutex_); | ||
auto thread_id = std::this_thread::get_id(); | ||
|
||
auto marker_iter = markers_.find(point); | ||
if (marker_iter != markers_.end()) { | ||
for (auto &marked_point : marker_iter->second) { | ||
marked_thread_id_.emplace(marked_point, thread_id); | ||
} | ||
} | ||
|
||
if (DisabledByMarker(point, thread_id)) { | ||
return; | ||
} | ||
|
||
while (!PredecessorsAllCleared(point)) { | ||
cv_.wait(lock); | ||
if (DisabledByMarker(point, thread_id)) { | ||
return; | ||
} | ||
} | ||
|
||
auto callback_pair = callbacks_.find(point); | ||
if (callback_pair != callbacks_.end()) { | ||
num_callbacks_running_++; | ||
mutex_.unlock(); | ||
callback_pair->second(cb_arg); | ||
mutex_.lock(); | ||
num_callbacks_running_--; | ||
} | ||
cleared_points_.insert(point); | ||
cv_.notify_all(); | ||
} | ||
} // namespace braft | ||
#endif // NDEBUG |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | ||
// This source code is licensed under both the GPLv2 (found in the | ||
// COPYING file in the root directory) and Apache 2.0 License | ||
// (found in the LICENSE.Apache file in the root directory). | ||
#pragma once | ||
|
||
#include <assert.h> | ||
|
||
#include <functional> | ||
#include <mutex> | ||
#include <string> | ||
#include <thread> | ||
#include <vector> | ||
|
||
#ifdef NDEBUG | ||
#define TEST_SYNC_POINT(x) | ||
#define TEST_IDX_SYNC_POINT(x, index) | ||
#define TEST_SYNC_POINT_CALLBACK(x, y) | ||
#define INIT_SYNC_POINT_SINGLETONS() | ||
#else | ||
|
||
namespace braft { | ||
|
||
// This class provides facility to reproduce race conditions deterministically | ||
// in unit tests. | ||
// Developer could specify sync points in the codebase via TEST_SYNC_POINT. | ||
// Each sync point represents a position in the execution stream of a thread. | ||
// In the unit test, 'Happens After' relationship among sync points could be | ||
// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of | ||
// threads execution. | ||
|
||
class SyncPoint { | ||
public: | ||
static SyncPoint* GetInstance(); | ||
|
||
SyncPoint(const SyncPoint&) = delete; | ||
SyncPoint& operator=(const SyncPoint&) = delete; | ||
~SyncPoint(); | ||
|
||
struct SyncPointPair { | ||
std::string predecessor; | ||
std::string successor; | ||
}; | ||
|
||
// call once at the beginning of a test to setup the dependency between | ||
// sync points. Specifically, execution will not be allowed to proceed past | ||
// each successor until execution has reached the corresponding predecessor, | ||
// in any thread. | ||
void LoadDependency(const std::vector<SyncPointPair>& dependencies); | ||
|
||
// call once at the beginning of a test to setup the dependency between | ||
// sync points and setup markers indicating the successor is only enabled | ||
// when it is processed on the same thread as the predecessor. | ||
// When adding a marker, it implicitly adds a dependency for the marker pair. | ||
void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies, | ||
const std::vector<SyncPointPair>& markers); | ||
|
||
// The argument to the callback is passed through from | ||
// TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or | ||
// TEST_IDX_SYNC_POINT was used. | ||
void SetCallBack(const std::string& point, | ||
const std::function<void(void*)>& callback); | ||
|
||
// Clear callback function by point | ||
void ClearCallBack(const std::string& point); | ||
|
||
// Clear all call back functions. | ||
void ClearAllCallBacks(); | ||
|
||
// enable sync point processing (disabled on startup) | ||
void EnableProcessing(); | ||
|
||
// disable sync point processing | ||
void DisableProcessing(); | ||
|
||
// remove the execution trace of all sync points | ||
void ClearTrace(); | ||
|
||
// triggered by TEST_SYNC_POINT, blocking execution until all predecessors | ||
// are executed. | ||
// And/or call registered callback function, with argument `cb_arg` | ||
void Process(const std::string& point, void* cb_arg = nullptr); | ||
|
||
// template gets length of const string at compile time, | ||
// avoiding strlen() at runtime | ||
template <size_t kLen> | ||
void Process(const char (&point)[kLen], void* cb_arg = nullptr) { | ||
static_assert(kLen > 0, "Must not be empty"); | ||
assert(point[kLen - 1] == '\0'); | ||
Process(std::string(point, kLen - 1), cb_arg); | ||
} | ||
|
||
// TODO: it might be useful to provide a function that blocks until all | ||
// sync points are cleared. | ||
|
||
// We want this to be public so we can | ||
// subclass the implementation | ||
struct Data; | ||
|
||
private: | ||
// Singleton | ||
SyncPoint(); | ||
Data* impl_; | ||
}; | ||
|
||
// Sets up sync points to mock direct IO instead of actually issuing direct IO | ||
// to the file system. | ||
void SetupSyncPointsToMockDirectIO(); | ||
} // namespace braft | ||
|
||
// Use TEST_SYNC_POINT to specify sync points inside code base. | ||
// Sync points can have happens-after dependency on other sync points, | ||
// configured at runtime via SyncPoint::LoadDependency. This could be | ||
// utilized to re-produce race conditions between threads. | ||
// See TransactionLogIteratorRace in db_test.cc for an example use case. | ||
// TEST_SYNC_POINT is no op in release build. | ||
#define TEST_SYNC_POINT(x) \ | ||
braft::SyncPoint::GetInstance()->Process(x) | ||
#define TEST_IDX_SYNC_POINT(x, index) \ | ||
braft::SyncPoint::GetInstance()->Process(x + \ | ||
std::to_string(index)) | ||
#define TEST_SYNC_POINT_CALLBACK(x, y) \ | ||
braft::SyncPoint::GetInstance()->Process(x, y) | ||
#define INIT_SYNC_POINT_SINGLETONS() \ | ||
(void)braft::SyncPoint::GetInstance(); | ||
#endif // NDEBUG | ||
|
||
// Callback sync point for any read IO errors that should be ignored by | ||
// the fault injection framework | ||
// Disable in release mode | ||
#ifdef NDEBUG | ||
#define IGNORE_STATUS_IF_ERROR(_status_) | ||
#else | ||
#define IGNORE_STATUS_IF_ERROR(_status_) \ | ||
{ \ | ||
if (!_status_.ok()) { \ | ||
TEST_SYNC_POINT("FaultInjectionIgnoreError"); \ | ||
} \ | ||
} | ||
#endif // NDEBUG |
Oops, something went wrong.