From 56965a052b0be3b9f17a5190e6bab788943a7d73 Mon Sep 17 00:00:00 2001 From: ehds Date: Sat, 24 Jun 2023 22:44:42 +0800 Subject: [PATCH] fix-peers-change-failed-when-cluster-restart-in-joint-status --- src/braft/node.cpp | 13 ++- src/braft/sync_point.cpp | 217 +++++++++++++++++++++++++++++++++++++++ src/braft/sync_point.h | 154 +++++++++++++++++++++++++++ test/test_node.cpp | 118 +++++++++++++++++++++ test/util.h | 7 +- 5 files changed, 500 insertions(+), 9 deletions(-) create mode 100644 src/braft/sync_point.cpp create mode 100644 src/braft/sync_point.h diff --git a/src/braft/node.cpp b/src/braft/node.cpp index f5802f7f..15065e71 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -33,6 +33,9 @@ #include "braft/node_manager.h" #include "braft/snapshot_executor.h" #include "braft/errno.pb.h" +#include "braft/sync_point.h" +#include "butil/logging.h" + namespace braft { @@ -1585,9 +1588,9 @@ void NodeImpl::pre_vote(std::unique_lock* lck, bool triggered) { " configuration is possibly out of date"; return; } - if (!_conf.contains(_server_id)) { + if (_conf.empty()) { LOG(WARNING) << "node " << _group_id << ':' << _server_id - << " can't do pre_vote as it is not in " << _conf.conf; + << " can't do pre_vote as conf is emtpy"; return; } @@ -1644,9 +1647,9 @@ void NodeImpl::elect_self(std::unique_lock* lck, bool old_leader_stepped_down) { LOG(INFO) << "node " << _group_id << ":" << _server_id << " term " << _current_term << " start vote and grant vote self"; - if (!_conf.contains(_server_id)) { + if (_conf.empty()) { LOG(WARNING) << "node " << _group_id << ':' << _server_id - << " can't do elect_self as it is not in " << _conf.conf; + << " can't do elect_self as _conf is empty"; return; } // cancel follower election timer @@ -2108,7 +2111,6 @@ int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request, LogId last_log_id = _log_manager->last_log_id(true); lck.lock(); // pre_vote not need ABA check after unlock&lock - int64_t votable_time = _follower_lease.votable_time_from_now(); bool grantable = (LogId(request->last_log_index(), request->last_log_term()) >= last_log_id); @@ -3267,6 +3269,7 @@ void NodeImpl::ConfigurationCtx::next_stage() { // implementation. case STAGE_JOINT: _stage = STAGE_STABLE; + TEST_SYNC_POINT_CALLBACK("NodeImpl::ConfigurationCtx:StableStage:BeforeApplyConfiguration", _node); return _node->unsafe_apply_configuration( Configuration(_new_peers), NULL, false); case STAGE_STABLE: diff --git a/src/braft/sync_point.cpp b/src/braft/sync_point.cpp new file mode 100644 index 00000000..518fc785 --- /dev/null +++ b/src/braft/sync_point.cpp @@ -0,0 +1,217 @@ +// Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "sync_point.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#ifndef NDEBUG +namespace braft { + +struct SyncPoint::Data { + Data() : enabled_(false) {} + // Enable proper deletion by subclasses + virtual ~Data() {} + // successor/predecessor map loaded from LoadDependency + std::unordered_map> successors_; + std::unordered_map> predecessors_; + std::unordered_map> callbacks_; + std::unordered_map> markers_; + std::unordered_map marked_thread_id_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set cleared_points_; + std::atomic enabled_; + int num_callbacks_running_ = 0; + + void LoadDependency(const std::vector &dependencies); + void LoadDependencyAndMarkers(const std::vector &dependencies, + const std::vector &markers); + bool PredecessorsAllCleared(const std::string &point); + void SetCallBack(const std::string &point, + const std::function &callback) { + std::lock_guard lock(mutex_); + callbacks_[point] = callback; + } + + void ClearCallBack(const std::string &point); + void ClearAllCallBacks(); + void EnableProcessing() { enabled_ = true; } + void DisableProcessing() { enabled_ = false; } + void ClearTrace() { + std::lock_guard lock(mutex_); + cleared_points_.clear(); + } + bool DisabledByMarker(const std::string &point, std::thread::id thread_id) { + auto marked_point_iter = marked_thread_id_.find(point); + return marked_point_iter != marked_thread_id_.end() && + thread_id != marked_point_iter->second; + } + void Process(const std::string &point, void *cb_arg); +}; + +SyncPoint *SyncPoint::GetInstance() { + static SyncPoint sync_point; + return &sync_point; +} + +SyncPoint::SyncPoint() : impl_(new Data) {} + +SyncPoint::~SyncPoint() { delete impl_; } + +void SyncPoint::LoadDependency(const std::vector &dependencies) { + impl_->LoadDependency(dependencies); +} + +void SyncPoint::LoadDependencyAndMarkers( + const std::vector &dependencies, + const std::vector &markers) { + impl_->LoadDependencyAndMarkers(dependencies, markers); +} + +void SyncPoint::SetCallBack(const std::string &point, + const std::function &callback) { + impl_->SetCallBack(point, callback); +} + +void SyncPoint::ClearCallBack(const std::string &point) { + impl_->ClearCallBack(point); +} + +void SyncPoint::ClearAllCallBacks() { impl_->ClearAllCallBacks(); } + +void SyncPoint::EnableProcessing() { impl_->EnableProcessing(); } + +void SyncPoint::DisableProcessing() { impl_->DisableProcessing(); } + +void SyncPoint::ClearTrace() { impl_->ClearTrace(); } + +void SyncPoint::Process(const std::string &point, void *cb_arg) { + impl_->Process(point, cb_arg); +} + +void SyncPoint::Data::LoadDependency( + const std::vector &dependencies) { + std::lock_guard lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto &dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + cv_.notify_all(); +} + +void SyncPoint::Data::LoadDependencyAndMarkers( + const std::vector &dependencies, + const std::vector &markers) { + std::lock_guard lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + markers_.clear(); + marked_thread_id_.clear(); + for (const auto &dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + for (const auto &marker : markers) { + successors_[marker.predecessor].push_back(marker.successor); + predecessors_[marker.successor].push_back(marker.predecessor); + markers_[marker.predecessor].push_back(marker.successor); + } + cv_.notify_all(); +} + +bool SyncPoint::Data::PredecessorsAllCleared(const std::string &point) { + for (const auto &pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::Data::ClearCallBack(const std::string &point) { + std::unique_lock lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.erase(point); +} + +void SyncPoint::Data::ClearAllCallBacks() { + std::unique_lock lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.clear(); +} + +void SyncPoint::Data::Process(const std::string &point, void *cb_arg) { + if (!enabled_) { + return; + } + + std::unique_lock lock(mutex_); + auto thread_id = std::this_thread::get_id(); + + auto marker_iter = markers_.find(point); + if (marker_iter != markers_.end()) { + for (auto &marked_point : marker_iter->second) { + marked_thread_id_.emplace(marked_point, thread_id); + } + } + + if (DisabledByMarker(point, thread_id)) { + return; + } + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + if (DisabledByMarker(point, thread_id)) { + return; + } + } + + auto callback_pair = callbacks_.find(point); + if (callback_pair != callbacks_.end()) { + num_callbacks_running_++; + mutex_.unlock(); + callback_pair->second(cb_arg); + mutex_.lock(); + num_callbacks_running_--; + } + cleared_points_.insert(point); + cv_.notify_all(); +} +} // namespace braft +#endif // NDEBUG diff --git a/src/braft/sync_point.h b/src/braft/sync_point.h new file mode 100644 index 00000000..0f8eb866 --- /dev/null +++ b/src/braft/sync_point.h @@ -0,0 +1,154 @@ +// Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include + +#include +#include +#include +#include +#include + +#ifdef NDEBUG +#define TEST_SYNC_POINT(x) +#define TEST_IDX_SYNC_POINT(x, index) +#define TEST_SYNC_POINT_CALLBACK(x, y) +#define INIT_SYNC_POINT_SINGLETONS() +#else + +namespace braft { + +// This class provides facility to reproduce race conditions deterministically +// in unit tests. +// Developer could specify sync points in the codebase via TEST_SYNC_POINT. +// Each sync point represents a position in the execution stream of a thread. +// In the unit test, 'Happens After' relationship among sync points could be +// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of +// threads execution. + +class SyncPoint { + public: + static SyncPoint* GetInstance(); + + SyncPoint(const SyncPoint&) = delete; + SyncPoint& operator=(const SyncPoint&) = delete; + ~SyncPoint(); + + struct SyncPointPair { + std::string predecessor; + std::string successor; + }; + + // call once at the beginning of a test to setup the dependency between + // sync points. Specifically, execution will not be allowed to proceed past + // each successor until execution has reached the corresponding predecessor, + // in any thread. + void LoadDependency(const std::vector& dependencies); + + // call once at the beginning of a test to setup the dependency between + // sync points and setup markers indicating the successor is only enabled + // when it is processed on the same thread as the predecessor. + // When adding a marker, it implicitly adds a dependency for the marker pair. + void LoadDependencyAndMarkers(const std::vector& dependencies, + const std::vector& markers); + + // The argument to the callback is passed through from + // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or + // TEST_IDX_SYNC_POINT was used. + void SetCallBack(const std::string& point, + const std::function& callback); + + // Clear callback function by point + void ClearCallBack(const std::string& point); + + // Clear all call back functions. + void ClearAllCallBacks(); + + // enable sync point processing (disabled on startup) + void EnableProcessing(); + + // disable sync point processing + void DisableProcessing(); + + // remove the execution trace of all sync points + void ClearTrace(); + + // triggered by TEST_SYNC_POINT, blocking execution until all predecessors + // are executed. + // And/or call registered callback function, with argument `cb_arg` + void Process(const std::string& point, void* cb_arg = nullptr); + + // template gets length of const string at compile time, + // avoiding strlen() at runtime + template + void Process(const char (&point)[kLen], void* cb_arg = nullptr) { + static_assert(kLen > 0, "Must not be empty"); + assert(point[kLen - 1] == '\0'); + Process(std::string(point, kLen - 1), cb_arg); + } + + // TODO: it might be useful to provide a function that blocks until all + // sync points are cleared. + + // We want this to be public so we can + // subclass the implementation + struct Data; + + private: + // Singleton + SyncPoint(); + Data* impl_; +}; + +// Sets up sync points to mock direct IO instead of actually issuing direct IO +// to the file system. +void SetupSyncPointsToMockDirectIO(); +} // namespace braft + +// Use TEST_SYNC_POINT to specify sync points inside code base. +// Sync points can have happens-after dependency on other sync points, +// configured at runtime via SyncPoint::LoadDependency. This could be +// utilized to re-produce race conditions between threads. +// See TransactionLogIteratorRace in db_test.cc for an example use case. +// TEST_SYNC_POINT is no op in release build. +#define TEST_SYNC_POINT(x) \ + braft::SyncPoint::GetInstance()->Process(x) +#define TEST_IDX_SYNC_POINT(x, index) \ + braft::SyncPoint::GetInstance()->Process(x + \ + std::to_string(index)) +#define TEST_SYNC_POINT_CALLBACK(x, y) \ + braft::SyncPoint::GetInstance()->Process(x, y) +#define INIT_SYNC_POINT_SINGLETONS() \ + (void)braft::SyncPoint::GetInstance(); +#endif // NDEBUG + +// Callback sync point for any read IO errors that should be ignored by +// the fault injection framework +// Disable in release mode +#ifdef NDEBUG +#define IGNORE_STATUS_IF_ERROR(_status_) +#else +#define IGNORE_STATUS_IF_ERROR(_status_) \ + { \ + if (!_status_.ok()) { \ + TEST_SYNC_POINT("FaultInjectionIgnoreError"); \ + } \ + } +#endif // NDEBUG diff --git a/test/test_node.cpp b/test/test_node.cpp index fc5f14bb..9b6f2e49 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -4,11 +4,13 @@ // Author: WangYao (fisherman), wangyao02@baidu.com // Date: 2015/10/08 17:00:05 +#include #include #include #include #include #include +#include #include #include #include @@ -16,6 +18,8 @@ #include #include #include +#include +#include #include "../test/util.h" namespace braft { @@ -2756,6 +2760,120 @@ TEST_P(NodeTest, change_peers_steps_down_in_joint_consensus) { ASSERT_TRUE(!leader->_impl->_conf_ctx.is_busy()); } +// A test case used to reboot cluster between join stage and stable stage, +// during this time the new configuration has not replicated to C_{new} peers, but +// replicated to diff(C_{old,new}, C_{new}). +TEST_P(NodeTest, change_peers_restart_cluster_before_stable_stage) { + // This case do the folling steps: + // 1. start a five nodes cluster (peer0,1,2,3,4). + // 2. remove 3 nodes (form {peer0,1,2,3,4} to {peer3,4}). + // 3. stop {peer3,4} before leader start to replicate new configuration. + // 4. reboot cluster, and check memebership change can be continued to + // complete successfuly. + std::vector all_peers; + braft::PeerId peer0("127.0.0.1:5006"); + braft::PeerId peer1("127.0.0.1:5007"); + braft::PeerId peer2("127.0.0.1:5008"); + braft::PeerId peer3("127.0.0.1:5009"); + braft::PeerId peer4("127.0.0.1:5010"); + all_peers = {peer0, peer1, peer2, peer3, peer4}; + + Cluster cluster("unittest", all_peers); + for (const auto &peer : all_peers) { + ASSERT_EQ(cluster.start(peer.addr), 0); + } + LOG(INFO) << "started five nodes cluster"; + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + + // Transfer leadership to peer0 because it not in `keep_peers`. + ASSERT_EQ(0, leader->transfer_leadership_to(peer0)); + cluster.wait_leader(); + leader = cluster.leader(); + ASSERT_EQ(leader->leader_id(), peer0); + + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // Start to change peers from 5 to 2, just keep peer3 and peer4. + // + // After joint configuration (`C_old `and `C_new`) has committed + // and before applying stable configuration (`Cnew`), we stop peer3 and peer4, + // so `Cnew` can not be committed. Then restart cluster to continue membership + // change process. + std::vector keep_peers = {peer3, peer4}; + SyncPoint::GetInstance()->SetCallBack( + "NodeImpl::ConfigurationCtx:StableStage:BeforeApplyConfiguration", + [&cluster, &keep_peers](void *) -> void { + for (const auto &peer : keep_peers) { + ASSERT_EQ(cluster.stop(peer.addr), 0); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + braft::SynchronizedClosure done; + Configuration conf(keep_peers); + leader->change_peers(conf, &done); + done.wait(); + ASSERT_EQ(EPERM, done.status().error_code()); + // Current leader has received Cnew. + ASSERT_TRUE(leader->_impl->_conf.stable()); + + // Restart cluster + cluster.stop_all(); + SyncPoint::GetInstance()->DisableProcessing(); + + Cluster new_cluster("unittest", keep_peers); + for (const auto &peer : all_peers) { + ASSERT_EQ(cluster.start(peer.addr), 0); + } + LOG(INFO) << "restarted five nodes cluste"; + + // Waiting leader to be elected from `keep_peers` eventually. + int wait_count = 10; + while (wait_count > 0) { + cluster.wait_leader(); + leader = cluster.leader(); + if (std::find(keep_peers.begin(), keep_peers.end(), + leader->leader_id()) != keep_peers.end()) { + break; + } + usleep(5 * 1000); + --wait_count; + } + ASSERT_GT(wait_count, 0); + cluster.check_node_status(); + + // Now we can remove unused nodes safely. + for (const auto &peer : all_peers) { + if (std::find(keep_peers.begin(), keep_peers.end(), + peer) == keep_peers.end()) { + ASSERT_EQ(cluster.stop(peer.addr), 0); + } + } + + leader = cluster.leader(); + ASSERT_TRUE(leader->_impl->_conf.stable()); + // Check the new configuration must be the same as `keep_peers`. + std::vector new_peers; + leader->list_peers(&new_peers); + for (const auto &peer : new_peers) { + ASSERT_TRUE(std::find(keep_peers.begin(), keep_peers.end(), peer) != + keep_peers.end()); + } + cluster.ensure_same(); +} + struct ChangeArg { Cluster* c; std::vector peers; diff --git a/test/util.h b/test/util.h index e6984920..7c9dbd7e 100644 --- a/test/util.h +++ b/test/util.h @@ -264,9 +264,8 @@ class Cluster { butil::endpoint2str(listen_addr).c_str()); butil::string_printf(&options.snapshot_uri, "local://./data/%s/snapshot", butil::endpoint2str(listen_addr).c_str()); - - scoped_refptr tst(_throttle); - options.snapshot_throttle = &tst; + + options.snapshot_throttle = &_throttle; options.catchup_margin = 2; @@ -516,7 +515,7 @@ class Cluster { int32_t _election_timeout_ms; int32_t _max_clock_drift_ms; raft_mutex_t _mutex; - braft::SnapshotThrottle* _throttle; + scoped_refptr _throttle; }; #endif // ~PUBLIC_RAFT_TEST_UTIL_H