From dbc827eba6dc0fa8cbe9849106a3c83ff4c285ad Mon Sep 17 00:00:00 2001 From: LiYiChao Date: Tue, 29 Nov 2022 21:47:36 +0800 Subject: [PATCH] raft arbiter support. Signed-off-by: Yichao Li --- src/braft/fsm_caller.cpp | 25 ++++++++++++++++++++----- src/braft/log_manager.cpp | 4 +++- src/braft/log_manager.h | 5 +++++ src/braft/node.cpp | 32 ++++++++++++++++---------------- src/braft/node.h | 3 +++ src/braft/raft.cpp | 3 +++ src/braft/raft.h | 8 ++++++++ src/braft/raft.proto | 1 + src/braft/replicator.cpp | 14 ++++++++++++++ src/braft/replicator.h | 3 +++ 10 files changed, 76 insertions(+), 22 deletions(-) diff --git a/src/braft/fsm_caller.cpp b/src/braft/fsm_caller.cpp index 98913eea..8e310635 100644 --- a/src/braft/fsm_caller.cpp +++ b/src/braft/fsm_caller.cpp @@ -298,7 +298,11 @@ void FSMCaller::do_committed(int64_t committed_index) { continue; } Iterator iter(&iter_impl); - _fsm->on_apply(iter); + if (!_node->arbiter()) { + _fsm->on_apply(iter); + } else { + for (;iter.valid();iter.next()) {} + } LOG_IF(ERROR, iter.valid()) << "Node " << _node->node_id() << " Iterator is still valid, did you return before iterator " @@ -353,7 +357,11 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) { return; } - _fsm->on_snapshot_save(writer, done); + if (!_node->arbiter()) { + _fsm->on_snapshot_save(writer, done); + } else { + done->Run(); + } return; } @@ -402,7 +410,10 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) { return done->Run(); } - ret = _fsm->on_snapshot_load(reader); + if (!_node->arbiter()) { + ret = _fsm->on_snapshot_load(reader); + } + if (ret != 0) { done->status().set_error(ret, "StateMachine on_snapshot_load failed"); done->Run(); @@ -454,12 +465,16 @@ int FSMCaller::on_leader_start(int64_t term, int64_t lease_epoch) { } void FSMCaller::do_leader_stop(const butil::Status& status) { - _fsm->on_leader_stop(status); + if (!_node->arbiter()) { + _fsm->on_leader_stop(status); + } } void FSMCaller::do_leader_start(const LeaderStartContext& leader_start_context) { _node->leader_lease_start(leader_start_context.lease_epoch); - _fsm->on_leader_start(leader_start_context.term); + if (!_node->arbiter()) { + _fsm->on_leader_start(leader_start_context.term); + } } int FSMCaller::on_start_following(const LeaderChangeContext& start_following_context) { diff --git a/src/braft/log_manager.cpp b/src/braft/log_manager.cpp index cf077751..9b3e2ad4 100644 --- a/src/braft/log_manager.cpp +++ b/src/braft/log_manager.cpp @@ -71,6 +71,7 @@ LogManager::LogManager() , _next_wait_id(0) , _first_log_index(0) , _last_log_index(0) + , _complete_index(std::numeric_limits::max()) { CHECK_EQ(0, start_disk_thread()); } @@ -276,7 +277,8 @@ int LogManager::truncate_prefix(const int64_t first_index_kept, _last_log_index = first_index_kept - 1; } _config_manager->truncate_prefix(first_index_kept); - TruncatePrefixClosure* c = new TruncatePrefixClosure(first_index_kept); + TruncatePrefixClosure* c = new TruncatePrefixClosure( + std::min(first_index_kept, _complete_index.load(butil::memory_order_relaxed))); const int rc = bthread::execution_queue_execute(_disk_queue, c); lck.unlock(); for (size_t i = 0; i < saved_logs_in_memory.size(); ++i) { diff --git a/src/braft/log_manager.h b/src/braft/log_manager.h index fcd52dc3..17d68a02 100644 --- a/src/braft/log_manager.h +++ b/src/braft/log_manager.h @@ -149,6 +149,10 @@ class BAIDU_CACHELINE_ALIGNMENT LogManager { // Get the internal status of LogManager. void get_status(LogManagerStatus* status); + void set_complete_index(int64_t index) { + _complete_index.store(index, butil::memory_order_relaxed); + } + private: friend class AppendBatcher; struct WaitMeta { @@ -218,6 +222,7 @@ friend class AppendBatcher; int64_t _last_log_index; // the last snapshot's log_id LogId _last_snapshot_id; + butil::atomic _complete_index; // the virtual first log, for finding next_index of replicator, which // can avoid install_snapshot too often in extreme case where a follower's // install_snapshot is slower than leader's save_snapshot diff --git a/src/braft/node.cpp b/src/braft/node.cpp index dc96daf8..91f3ae02 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -91,7 +91,15 @@ class ConfigurationChangeDone : public Closure { _node->on_configuration_change_done(_term); if (_leader_start) { _node->leader_lease_start(_lease_epoch); - _node->_options.fsm->on_leader_start(_term); + if (_node->arbiter()) { + // todo: handle errors + CHECK(!_node->transfer_leadership_to(ANY_PEER)) << "Arbiter " << _node->node_id() + << " fail to transfer leader to others"; + CHECK(!_node->is_leader()) << "Arbiter " << _node->node_id() + << " is still leader after transfer_leadership_to ANY_PEER"; + } else { + _node->_options.fsm->on_leader_start(_term); + } } } delete this; @@ -1745,6 +1753,10 @@ void NodeImpl::request_peers_to_vote(const std::set& peers, } } +int64_t NodeImpl::complete_index() { + return _replicator_group.complete_index(); +} + // in lock void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate, const butil::Status& status) { @@ -1872,21 +1884,6 @@ void NodeImpl::check_step_down(const int64_t request_term, const PeerId& server_ } } -class LeaderStartClosure : public Closure { -public: - LeaderStartClosure(StateMachine* fsm, int64_t term) : _fsm(fsm), _term(term) {} - ~LeaderStartClosure() {} - void Run() { - if (status().ok()) { - _fsm->on_leader_start(_term); - } - delete this; - } -private: - StateMachine* _fsm; - int64_t _term; -}; - // in lock void NodeImpl::become_leader() { CHECK(_state == STATE_CANDIDATE); @@ -2477,6 +2474,9 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl, _ballot_box->set_last_committed_index( std::min(request->committed_index(), prev_log_index)); + if (arbiter()) { + _log_manager->set_complete_index(request->complete_index()); + } return; } diff --git a/src/braft/node.h b/src/braft/node.h index d8565f39..17b235c8 100644 --- a/src/braft/node.h +++ b/src/braft/node.h @@ -241,6 +241,9 @@ friend class VoteBallotCtx; bool disable_cli() const { return _options.disable_cli; } + bool arbiter() { return _options.arbiter;} + int64_t complete_index(); + private: friend class butil::RefCountedThreadSafe; diff --git a/src/braft/raft.cpp b/src/braft/raft.cpp index 6069f706..b9f3c1ab 100644 --- a/src/braft/raft.cpp +++ b/src/braft/raft.cpp @@ -155,6 +155,9 @@ PeerId Node::leader_id() { } bool Node::is_leader() { + if (_impl->arbiter()) { + return false; + } return _impl->is_leader(); } diff --git a/src/braft/raft.h b/src/braft/raft.h index ef9cead8..38f1d7f7 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -588,6 +588,13 @@ struct NodeOptions { // Default: false bool disable_cli; + // If true, this node will not have a copy of data and only participates in elections + // from user's viewpoint this node will never become leader, + // on_apply/on_snapshot_save/on_snapshot_load/on_leader_start/on_leader_stop etc will not be called + // todo: avoid installing snapshot for arbiter + // Default: false + bool arbiter; + // Construct a default instance NodeOptions(); @@ -609,6 +616,7 @@ inline NodeOptions::NodeOptions() , snapshot_file_system_adaptor(NULL) , snapshot_throttle(NULL) , disable_cli(false) + , arbiter(false) {} inline int NodeOptions::get_catchup_timeout_ms() { diff --git a/src/braft/raft.proto b/src/braft/raft.proto index b2df8e99..e36f0497 100644 --- a/src/braft/raft.proto +++ b/src/braft/raft.proto @@ -48,6 +48,7 @@ message AppendEntriesRequest { required int64 prev_log_index = 6; repeated EntryMeta entries = 7; required int64 committed_index = 8; + optional int64 complete_index = 9; }; message AppendEntriesResponse { diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 2e0d1e17..680d8af8 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -558,6 +558,7 @@ void Replicator::_send_empty_entries(bool is_heartbeat) { _heartbeat_counter++; // set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized. cntl->set_timeout_ms(*_options.election_timeout_ms / 2); + request->set_complete_index(_options.node->complete_index()); } else { _st.st = APPENDING_ENTRIES; _st.first_log_index = _next_index; @@ -756,6 +757,10 @@ void Replicator::_wait_more_entries() { } void Replicator::_install_snapshot() { + CHECK(!_options.node->arbiter()) << "node " << _options.group_id << ":" << _options.server_id + << " refuse to send InstallSnapshotRequest to " << _options.peer_id + << " because I am arbiter"; + if (_reader) { // follower's readonly mode change may cause two install_snapshot // one possible case is: @@ -1567,4 +1572,13 @@ bool ReplicatorGroup::readonly(const PeerId& peer) const { return Replicator::readonly(rid); } +int64_t ReplicatorGroup::complete_index() const { + int64_t rst = std::numeric_limits::max(); + for (std::map::const_iterator + iter = _rmap.begin(); iter != _rmap.end(); ++iter) { + rst = std::min(rst, Replicator::get_next_index(iter->second.id)); + } + return rst; +} + } // namespace braft diff --git a/src/braft/replicator.h b/src/braft/replicator.h index 7223900f..c6c31a58 100644 --- a/src/braft/replicator.h +++ b/src/braft/replicator.h @@ -358,6 +358,9 @@ class ReplicatorGroup { // Check if a replicator is in readonly bool readonly(const PeerId& peer) const; + // all log index before `complete_index()` have been persisted by all peers + int64_t complete_index() const; + private: int _add_replicator(const PeerId& peer, ReplicatorId *rid);