diff --git a/src/braft/errno.proto b/src/braft/errno.proto index 3fc909ec..2fae7008 100644 --- a/src/braft/errno.proto +++ b/src/braft/errno.proto @@ -32,5 +32,9 @@ enum RaftError { ENOMOREUSERLOG = 10015; // Raft node in readonly mode EREADONLY = 10016; + // Raft group not in degraded mode + ENOTDEGRADED = 10017; + // Arbiter node receives non virtual snapshot + ESNAPSHOTNOTVIRTUAL = 10018; }; diff --git a/src/braft/fsm_caller.cpp b/src/braft/fsm_caller.cpp index 98913eea..b8e83bc2 100644 --- a/src/braft/fsm_caller.cpp +++ b/src/braft/fsm_caller.cpp @@ -298,7 +298,13 @@ void FSMCaller::do_committed(int64_t committed_index) { continue; } Iterator iter(&iter_impl); - _fsm->on_apply(iter); + if (!_node->arbiter()) { + _fsm->on_apply(iter); + } else { + while(iter.valid()) { + iter.next(); + } + } LOG_IF(ERROR, iter.valid()) << "Node " << _node->node_id() << " Iterator is still valid, did you return before iterator " @@ -352,8 +358,11 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) { done->Run(); return; } - - _fsm->on_snapshot_save(writer, done); + if (!_node->arbiter()) { + _fsm->on_snapshot_save(writer, done); + } else { + done->Run(); + } return; } @@ -402,7 +411,9 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) { return done->Run(); } - ret = _fsm->on_snapshot_load(reader); + if (!_node->arbiter()) { + ret = _fsm->on_snapshot_load(reader); + } if (ret != 0) { done->status().set_error(ret, "StateMachine on_snapshot_load failed"); done->Run(); diff --git a/src/braft/log_manager.cpp b/src/braft/log_manager.cpp index cf077751..e8b2343f 100644 --- a/src/braft/log_manager.cpp +++ b/src/braft/log_manager.cpp @@ -24,6 +24,7 @@ #include // BRPC_VALIDATE_GFLAG #include "braft/storage.h" // LogStorage #include "braft/fsm_caller.h" // FSMCaller +#include "braft/snapshot.h" // VirtualSnapshotReader namespace braft { @@ -967,4 +968,30 @@ butil::Status LogManager::check_consistency() { return butil::Status(-1, "Impossible condition"); } +SnapshotReader* LogManager::get_virtual_snapshot() { + BAIDU_SCOPED_LOCK(_mutex); + LogId last_id; + if (_last_log_index >= _first_log_index) { + last_id = LogId(_last_log_index, unsafe_get_term(_last_log_index)); + } else { + last_id = _last_snapshot_id; + } + ConfigurationEntry conf_entry; + _config_manager->get(last_id.index, &conf_entry); + SnapshotMeta meta; + meta.set_last_included_index(last_id.index); + meta.set_last_included_term(last_id.term); + for (Configuration::const_iterator + iter = conf_entry.conf.begin(); + iter != conf_entry.conf.end(); ++iter) { + *meta.add_peers() = iter->to_string(); + } + for (Configuration::const_iterator + iter = conf_entry.old_conf.begin(); + iter != conf_entry.old_conf.end(); ++iter) { + *meta.add_old_peers() = iter->to_string(); + } + return new VirtualSnapshotReader(meta); +} + } // namespace braft diff --git a/src/braft/log_manager.h b/src/braft/log_manager.h index fcd52dc3..bbcf56c9 100644 --- a/src/braft/log_manager.h +++ b/src/braft/log_manager.h @@ -149,6 +149,8 @@ class BAIDU_CACHELINE_ALIGNMENT LogManager { // Get the internal status of LogManager. void get_status(LogManagerStatus* status); + SnapshotReader* get_virtual_snapshot(); + private: friend class AppendBatcher; struct WaitMeta { diff --git a/src/braft/node.cpp b/src/braft/node.cpp index f5802f7f..faf30336 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -157,7 +157,8 @@ NodeImpl::NodeImpl(const GroupId& group_id, const PeerId& peer_id) , _append_entries_cache(NULL) , _append_entries_cache_version(0) , _node_readonly(false) - , _majority_nodes_readonly(false) { + , _majority_nodes_readonly(false) + , _degraded(false) { butil::string_printf(&_v_group_id, "%s_%d", _group_id.c_str(), _server_id.idx); AddRef(); g_num_nodes << 1; @@ -606,6 +607,7 @@ int NodeImpl::init(const NodeOptions& options) { _state = STATE_FOLLOWER; LOG(INFO) << "node " << _group_id << ":" << _server_id << " init," + << " arbiter: " << arbiter() << " term: " << _current_term << " last_log_id: " << _log_manager->last_log_id() << " conf: " << _conf.conf @@ -771,25 +773,26 @@ void NodeImpl::on_caughtup(const PeerId& peer, int64_t term, _conf_ctx.on_caughtup(version, peer, false); } -void NodeImpl::check_dead_nodes(const Configuration& conf, int64_t now_ms) { +void NodeImpl::check_dead_nodes(const Configuration& conf, int64_t now_ms, + Configuration &alive_nodes) { std::vector peers; conf.list_peers(&peers); - size_t alive_count = 0; + alive_nodes.reset(); Configuration dead_nodes; // for easily print for (size_t i = 0; i < peers.size(); i++) { if (peers[i] == _server_id) { - ++alive_count; + alive_nodes.add_peer(peers[i]); continue; } if (now_ms - _replicator_group.last_rpc_send_timestamp(peers[i]) <= _options.election_timeout_ms) { - ++alive_count; + alive_nodes.add_peer(peers[i]); continue; } dead_nodes.add_peer(peers[i]); } - if (alive_count >= peers.size() / 2 + 1) { + if (alive_nodes.size() >= peers.size() / 2 + 1) { return; } LOG(WARNING) << "node " << node_id() @@ -802,6 +805,71 @@ void NodeImpl::check_dead_nodes(const Configuration& conf, int64_t now_ms) { step_down(_current_term, false, status); } +bool NodeImpl::check_degraded(const Configuration &conf, + const Configuration &alive_nodes) { + std::vector peers; + alive_nodes.list_peers(&peers); + std::vector alive_non_arbiters; + for (size_t i = 0; i < peers.size(); i++) { + if (peers[i] == _server_id || !_replicator_group.arbiter(peers[i])) { + alive_non_arbiters.push_back(peers[i]); + } + } + + if (alive_non_arbiters.size() < conf.size() / 2 + 1) { + LOG(WARNING) << "node " << node_id() + << " term " << _current_term + << " alive non-arbiter node don't satisfy quorm" + << " for conf: " << conf + << " alive nodes: " << alive_nodes + << " alive non-arbiter nodes: " << Configuration(alive_non_arbiters); + return true; + } + + // not in degraded mode, skip further check + if (!_degraded) { + return false; + } + + // exit degraded mode if majority already caught up + Configuration caughtup_nodes; + for (size_t i = 0 ; i < alive_non_arbiters.size(); i++) { + if (alive_non_arbiters[i] == _server_id || + _replicator_group.is_caughtup(alive_non_arbiters[i], _options.catchup_margin)) { + caughtup_nodes.add_peer(alive_non_arbiters[i]); + } + } + LOG(WARNING) << "node " << node_id() + << " term " << _current_term + << " in degraded mode and alive non-arbiter node satisfy quorm" + << " for conf: " << conf + << " alive nodes: " << alive_nodes + << " alive non-arbiter nodes: " << Configuration(alive_non_arbiters) + << " alive non-arbiter caught up nodes: " << caughtup_nodes; + return caughtup_nodes.size() < conf.size() / 2 + 1; +} + +void NodeImpl::enter_degraded_mode() { + if (!_degraded) { + LOG(WARNING) << "node " << node_id() + << " term " << _current_term + << " enter degraded mode" + << " last_committed_index " << _ballot_box->last_committed_index(); + _degraded = true; + _replicator_group.enter_degraded_mode(); + } +} + +void NodeImpl::exit_degraded_mode() { + if (_degraded) { + LOG(WARNING) << "node " << node_id() + << " term " << _current_term + << " exit degraded mode"; + _degraded = false; + _replicator_group.exit_degraded_mode(); + } +} + void NodeImpl::handle_stepdown_timeout() { BAIDU_SCOPED_LOCK(_mutex); @@ -814,9 +882,25 @@ void NodeImpl::handle_stepdown_timeout() { } int64_t now = butil::monotonic_time_ms(); - check_dead_nodes(_conf.conf, now); + Configuration alive_nodes, old_alive_nodes; + check_dead_nodes(_conf.conf, now, alive_nodes); + if (!_conf.old_conf.empty()) { + check_dead_nodes(_conf.old_conf, now, old_alive_nodes); + } + + if (_state != STATE_LEADER) { + return; + } + + bool degraded = check_degraded(_conf.conf, alive_nodes); if (!_conf.old_conf.empty()) { - check_dead_nodes(_conf.old_conf, now); + degraded |= check_degraded(_conf.old_conf, old_alive_nodes); + } + + if (!degraded) { + exit_degraded_mode(); + } else { + enter_degraded_mode(); } } @@ -1053,6 +1137,12 @@ void NodeImpl::handle_election_timeout() { _leader_id.to_string().c_str()); reset_leader_id(empty_id, status); + if (arbiter()) { + BRAFT_VLOG << "arbiter node " << _group_id << ":" << _server_id + << " lost connection from leader"; + return; + } + return pre_vote(&lck, triggered); // Don't touch any thing of *this ever after } @@ -1090,6 +1180,16 @@ void NodeImpl::handle_timeout_now_request(brpc::Controller* controller, << state2str(saved_state) << " at term=" << saved_term; return; } + if (arbiter()) { + const int64_t saved_term = _current_term; + response->set_term(_current_term); + response->set_success(false); + lck.unlock(); + LOG(WARNING) << "arbiter node " << _group_id << ":" << _server_id + << " received handle_timeout_now_request " + << " at term=" << saved_term; + return; + } const butil::EndPoint remote_side = controller->remote_side(); const int64_t saved_term = _current_term; if (FLAGS_raft_enable_leader_lease) { @@ -1201,6 +1301,12 @@ int NodeImpl::transfer_leadership_to(const PeerId& peer) { << " which doesn't belong to " << _conf.conf; return EINVAL; } + if (_replicator_group.arbiter(peer_id)) { + LOG(INFO) << "node " << _group_id << ":" << _server_id + << " refused to transfer leadership to peer " + << peer_id << " which is arbiter"; + return EINVAL; + } const int64_t last_log_index = _log_manager->last_log_index(); const int rc = _replicator_group.transfer_leadership_to(peer_id, last_log_index); if (rc != 0) { @@ -1211,6 +1317,9 @@ int NodeImpl::transfer_leadership_to(const PeerId& peer) { LOG(WARNING) << "node " << _group_id << ":" << _server_id << " fail to transfer leadership, peer=" << peer_id << " whose consecutive_error_times not 0."; + } else if (rc == ENODATA) { + LOG(WARNING) << "node " << _group_id << ":" << _server_id + << " fail to transfer leadership, peer " << peer_id << " is arbiter"; } else { LOG(WARNING) << "node " << _group_id << ":" << _server_id << " fail to transfer leadership, peer=" << peer_id @@ -1243,6 +1352,9 @@ butil::Status NodeImpl::vote(int election_timeout_ms) { if (_state != STATE_FOLLOWER) { return butil::Status(EPERM, "is not follower"); } + if (arbiter()) { + return butil::Status(EPERM, "is arbiter"); + } int max_election_timeout_ms = _options.max_clock_drift_ms + _options.election_timeout_ms; if (election_timeout_ms > max_election_timeout_ms) { return butil::Status(EINVAL, "election_timeout_ms larger than safety threshold"); @@ -1789,6 +1901,7 @@ void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate, // _conf_ctx.reset() will stop replicators of catching up nodes _conf_ctx.reset(); _majority_nodes_readonly = false; + _degraded = false; clear_append_entries_cache(); @@ -2478,6 +2591,7 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl, response->set_term(_current_term); response->set_last_log_index(_log_manager->last_log_index()); response->set_readonly(_node_readonly); + response->set_arbiter(arbiter()); lck.unlock(); // see the comments at FollowerStableClosure::run() _ballot_box->set_last_committed_index( @@ -2701,10 +2815,12 @@ void NodeImpl::describe(std::ostream& os, bool use_html) { _replicator_group.list_replicators(&replicators); const int64_t leader_timestamp = _follower_lease.last_leader_timestamp(); const bool readonly = (_node_readonly || _majority_nodes_readonly); + const bool degraded = _degraded; lck.unlock(); const char *newline = use_html ? "
" : "\r\n"; os << "peer_id: " << _server_id << newline; os << "state: " << state2str(st) << newline; + os << "arbiter: " << arbiter() << newline; os << "readonly: " << readonly << newline; os << "term: " << term << newline; os << "conf_index: " << conf_index << newline; @@ -2725,7 +2841,8 @@ void NodeImpl::describe(std::ostream& os, bool use_html) { // info of configuration change if (st == STATE_LEADER) { os << "changing_conf: " << is_changing_conf - << " stage: " << conf_statge << newline; + << " stage: " << conf_statge << newline + << "degraded: " << degraded << newline; } if (!new_peers.empty()) { os << "new_peers:"; @@ -3578,6 +3695,11 @@ int64_t NodeImpl::last_leader_active_timestamp() { return timestamp; } +bool NodeImpl::degraded() { + BAIDU_SCOPED_LOCK(_mutex); + return _degraded; +} + struct LastActiveTimestampCompare { bool operator()(const int64_t& a, const int64_t& b) { return a > b; diff --git a/src/braft/node.h b/src/braft/node.h index d8565f39..6307bbc9 100644 --- a/src/braft/node.h +++ b/src/braft/node.h @@ -241,6 +241,10 @@ friend class VoteBallotCtx; bool disable_cli() const { return _options.disable_cli; } + bool arbiter() const { return _options.arbiter; } + + bool degraded(); + private: friend class butil::RefCountedThreadSafe; @@ -304,7 +308,12 @@ friend class butil::RefCountedThreadSafe; static int execute_applying_tasks( void* meta, bthread::TaskIterator& iter); void apply(LogEntryAndClosure tasks[], size_t size); - void check_dead_nodes(const Configuration& conf, int64_t now_ms); + void check_dead_nodes(const Configuration& conf, int64_t now_ms, + Configuration &alive_nodes); + // Check degraded mode for the conf. + // Return true if non-arbiter alive nodes that have caught up can't achieve majority. + bool check_degraded(const Configuration &conf, + const Configuration &alive_nodes); bool handle_out_of_order_append_entries(brpc::Controller* cntl, const AppendEntriesRequest* request, @@ -326,6 +335,9 @@ friend class butil::RefCountedThreadSafe; void request_peers_to_vote(const std::set& peers, const DisruptedLeader& disrupted_leader); + void enter_degraded_mode(); + void exit_degraded_mode(); + private: class ConfigurationCtx { @@ -533,6 +545,8 @@ friend class butil::RefCountedThreadSafe; LeaderLease _leader_lease; FollowerLease _follower_lease; + + bool _degraded; }; } diff --git a/src/braft/raft.cpp b/src/braft/raft.cpp index 6069f706..74fc2db7 100644 --- a/src/braft/raft.cpp +++ b/src/braft/raft.cpp @@ -242,6 +242,10 @@ bool Node::readonly() { return _impl->readonly(); } +bool Node::degraded() { + return _impl->degraded(); +} + // ------------- Iterator void Iterator::next() { if (valid()) { diff --git a/src/braft/raft.h b/src/braft/raft.h index d9730c47..15f527b1 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -589,6 +589,17 @@ struct NodeOptions { // Default: false bool disable_cli; + // If true, this is an arbiter node. + // Arbiter make it easier to achieve quorums but not maintain a full copy of data. + // Arbiter participate in leader election but are not eligible to become leader. + // Arbiter will not replicate any log if alive non-arbiter node can achieve quorums. + // Arbiter will replicate log's meta if alive non-arbiter node can't achieve quorums. + // Arbiter will never call following methods in StateMachine: + // on_apply, on_leader_start, on_leader_stop + // on_snapshot_save, on_snapshot_load, on_snapshot_purge + // Default: false + bool arbiter; + // Construct a default instance NodeOptions(); @@ -610,6 +621,7 @@ inline NodeOptions::NodeOptions() , snapshot_file_system_adaptor(NULL) , snapshot_throttle(NULL) , disable_cli(false) + , arbiter(false) {} inline int NodeOptions::get_catchup_timeout_ms() { @@ -775,6 +787,9 @@ class Node { // is less than the majority. bool readonly(); + // Return true if this is the leader, and the group is in degraded mode. + bool degraded(); + private: NodeImpl* _impl; }; diff --git a/src/braft/raft.proto b/src/braft/raft.proto index b2df8e99..c53a416f 100644 --- a/src/braft/raft.proto +++ b/src/braft/raft.proto @@ -55,6 +55,7 @@ message AppendEntriesResponse { required bool success = 2; optional int64 last_log_index = 3; optional bool readonly = 4; + optional bool arbiter = 5; }; message SnapshotMeta { diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 07b19d9d..b86b21ac 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -83,6 +83,8 @@ Replicator::Replicator() , _is_waiter_canceled(false) , _reader(NULL) , _catchup_closure(NULL) + , _arbiter(false) + , _degraded(false) { _install_snapshot_in_fly.value = 0; _heartbeat_in_fly.value = 0; @@ -188,6 +190,13 @@ void Replicator::wait_for_caught_up(ReplicatorId id, run_closure_in_bthread(done); return; } + if (r->_arbiter) { + LOG(INFO) << "Skip wait for arbiter caught up, group" << r->_options.group_id; + run_closure_in_bthread(done); + CHECK_EQ(0, bthread_id_unlock(dummy_id)) + << "Fail to unlock" << dummy_id; + return; + } done->_max_margin = max_margin; if (r->_has_succeeded && r->_is_catchup(max_margin)) { LOG(INFO) << "Already catch up before add catch up timer" @@ -217,6 +226,17 @@ void Replicator::wait_for_caught_up(ReplicatorId id, return; } +bool Replicator::is_caughtup(ReplicatorId id, int64_t max_margin) { + bthread_id_t dummy_id = { id }; + Replicator* r = NULL; + if (bthread_id_lock(dummy_id, (void**)&r) != 0) { + return false; + } + bool caught_up = r->_has_succeeded && r->_is_catchup(max_margin); + CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id; + return caught_up; +} + void* Replicator::_on_block_timedout_in_new_thread(void* arg) { Replicator* r = NULL; bthread_id_t id = { (uint64_t)arg }; @@ -246,6 +266,14 @@ void Replicator::_block(long start_time_us, int error_code) { return; } + if (error_code == ENOTDEGRADED) { + LOG(INFO) << "Blocking " << _options.peer_id + << ", not in degraded mode, group " << _options.group_id; + _st.st = BLOCKING; + CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id; + return; + } + // TODO: Currently we don't care about error_code which indicates why the // very RPC fails. To make it better there should be different timeout for // each individual error (e.g. we don't need check every @@ -333,7 +361,8 @@ void Replicator::_on_heartbeat_returned( } bool readonly = response->has_readonly() && response->readonly(); - BRAFT_VLOG << ss.str() << " readonly " << readonly; + bool arbiter = response->has_arbiter() && response->arbiter(); + BRAFT_VLOG << ss.str() << " readonly " << readonly << " arbiter " << arbiter; r->_update_last_rpc_send_timestamp(rpc_send_time); r->_start_heartbeat_timer(start_time_us); NodeImpl* node_impl = NULL; @@ -413,6 +442,19 @@ void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl, r->_reset_next_index(); return r->_block(start_time_us, cntl->ErrorCode()); } + + // Detect arbiter node. + bool arbiter = response->has_arbiter() && response->arbiter(); + if(!r->_arbiter && arbiter) { + LOG(INFO) << ss.str() << " detect arbiter peer"; + r->_arbiter = arbiter; + r->_notify_on_caught_up(0, false); + // dummy_id is unlock in block + r->_reset_next_index(); + r->_block(0, ENOTDEGRADED); + return; + } + r->_consecutive_error_times = 0; if (!response->success()) { if (response->term() > r->_options.term) { @@ -628,8 +670,11 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf *data) { } else { CHECK(entry->type != ENTRY_TYPE_CONFIGURATION) << "log_index=" << log_index; } - em->set_data_len(entry->data.length()); - data->append(entry->data); + // Only send user log meta to arbiter. + if (!_arbiter || entry->type == ENTRY_TYPE_CONFIGURATION) { + em->set_data_len(entry->data.length()); + data->append(entry->data); + } entry->Release(); return 0; } @@ -777,7 +822,7 @@ void Replicator::_install_snapshot() { return; } - if (_options.snapshot_throttle && !_options.snapshot_throttle-> + if (!_arbiter && _options.snapshot_throttle && !_options.snapshot_throttle-> add_one_more_task(true)) { return _block(butil::gettimeofday_us(), EBUSY); } @@ -786,7 +831,11 @@ void Replicator::_install_snapshot() { // blocked if something is wrong, such as throttled for a period of time _st.st = INSTALLING_SNAPSHOT; - _reader = _options.snapshot_storage->open(); + if (_arbiter) { + _reader = _options.log_manager->get_virtual_snapshot(); + } else { + _reader = _options.snapshot_storage->open(); + } if (!_reader) { if (_options.snapshot_throttle) { _options.snapshot_throttle->finish_one_task(true); @@ -871,9 +920,13 @@ void Replicator::_on_install_snapshot_returned( return; } if (r->_reader) { - r->_options.snapshot_storage->close(r->_reader); + if (!r->_arbiter) { + r->_options.snapshot_storage->close(r->_reader); + } else { + delete r->_reader; + } r->_reader = NULL; - if (r->_options.snapshot_throttle) { + if (!r->_arbiter && r->_options.snapshot_throttle) { r->_options.snapshot_throttle->finish_one_task(true); } } @@ -884,6 +937,21 @@ void Replicator::_on_install_snapshot_returned( << " last_included_term " << request->meta().last_included_term(); do { if (cntl->Failed()) { + // Detect arbiter node. + bool arbiter = cntl->ErrorCode() == ESNAPSHOTNOTVIRTUAL; + if (!r->_arbiter && arbiter) { + // Arbiter reject install non virtual snapshot, this can happend if we + // don't know a node is arbiter and try install a normal snapshot to it + LOG(INFO) << ss.str() << ", error: " <ErrorText() + << ", detect arbiter peer"; + r->_arbiter = arbiter; + r->_notify_on_caught_up(0, false); + r->_reset_next_index(); + // dummy_id is unlock in block + r->_block(0, ENOTDEGRADED); + return; + } + ss << " error: " << cntl->ErrorText(); LOG(INFO) << ss.str(); @@ -904,6 +972,11 @@ void Replicator::_on_install_snapshot_returned( } // Success r->_next_index = request->meta().last_included_index() + 1; + if (r->_arbiter) { + r->_options.ballot_box->commit_at( + 0, request->meta().last_included_index(), + r->_options.peer_id); + } ss << " success."; LOG(INFO) << ss.str(); } while (0); @@ -1053,6 +1126,10 @@ int Replicator::stop_transfer_leadership(ReplicatorId id) { } int Replicator::_transfer_leadership(int64_t log_index) { + if (_arbiter) { + CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id; + return ENODATA; + } if (_has_succeeded && _min_flying_index() > log_index) { // _id is unlock in _send_timeout_now _send_timeout_now(true, false); @@ -1252,6 +1329,54 @@ bool Replicator::readonly(ReplicatorId id) { return readonly; } +bool Replicator::arbiter(ReplicatorId id) { + Replicator *r = NULL; + bthread_id_t dummy_id = { id }; + if (bthread_id_lock(dummy_id, (void**)&r) != 0) { + return false; + } + bool arbiter = r->_arbiter; + CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id; + return arbiter; +} + +int Replicator::enter_degraded_mode(ReplicatorId id) { + Replicator *r = NULL; + bthread_id_t dummy_id = { id }; + if (bthread_id_lock(dummy_id, (void**)&r) != 0) { + return -1; + } + if(!r->_arbiter || r->_degraded) { + CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id; + return -1; + } + LOG(INFO) << "node " << r->_options.group_id << ":" << r->_options.server_id + << " resume replicating log to arbiter peer " << r->_options.peer_id; + r->_degraded = true; + // dummy_id is unlock in _install_snapshot. + r->_install_snapshot(); + return 0; +} + +int Replicator::exit_degraded_mode(ReplicatorId id) { + Replicator *r = NULL; + bthread_id_t dummy_id = { id }; + if (bthread_id_lock(dummy_id, (void**)&r) != 0) { + return -1; + } + if(!r->_arbiter || !r->_degraded) { + CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id; + return -1; + } + LOG(INFO) << "node " << r->_options.group_id << ":" << r->_options.server_id + << " stop replicating log to arbiter peer " << r->_options.peer_id; + r->_degraded = false; + r->_reset_next_index(); + // dummy_id is unlock in _block + r->_block(0, ENOTDEGRADED); + return 0; +} + void Replicator::_destroy() { bthread_id_t saved_id = _id; CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_id)); @@ -1271,10 +1396,12 @@ void Replicator::_describe(std::ostream& os, bool use_html) { const int64_t append_entries_counter = _append_entries_counter; const int64_t install_snapshot_counter = _install_snapshot_counter; const int64_t readonly_index = _readonly_index; + const bool arbiter = _arbiter; CHECK_EQ(0, bthread_id_unlock(_id)); // Don't touch *this ever after const char* new_line = use_html ? "
" : "\r\n"; os << "replicator_" << id << '@' << peer_id << ':'; + os << " arbiter=" << arbiter << ' '; os << " next_index=" << next_index << ' '; os << " flying_append_entries_size=" << flying_append_entries_size << ' '; if (readonly_index != 0) { @@ -1337,9 +1464,13 @@ void Replicator::get_status(ReplicatorId id, PeerStatus* status) { void Replicator::_close_reader() { if (_reader) { - _options.snapshot_storage->close(_reader); + if(!_arbiter) { + _options.snapshot_storage->close(_reader); + } else { + delete _reader; + } _reader = NULL; - if (_options.snapshot_throttle) { + if (!_arbiter && _options.snapshot_throttle) { _options.snapshot_throttle->finish_one_task(true); } } @@ -1414,6 +1545,15 @@ int ReplicatorGroup::wait_caughtup(const PeerId& peer, return 0; } +bool ReplicatorGroup::is_caughtup(const PeerId& peer, int64_t max_margin) { + std::map::const_iterator iter = _rmap.find(peer); + if (iter == _rmap.end()) { + return false; + } + ReplicatorId rid = iter->second.id; + return Replicator::is_caughtup(rid, max_margin); +} + int64_t ReplicatorGroup::last_rpc_send_timestamp(const PeerId& peer) { std::map::iterator iter = _rmap.find(peer); if (iter == _rmap.end()) { @@ -1525,6 +1665,9 @@ int ReplicatorGroup::find_the_next_candidate( if (!conf.contains(iter->first)) { continue; } + if (Replicator::arbiter(iter->second.id)) { + continue; + } const int64_t next_index = Replicator::get_next_index(iter->second.id); const int consecutive_error_times = Replicator::get_consecutive_error_times(iter->second.id); if (consecutive_error_times == 0 && next_index > max_index) { @@ -1577,4 +1720,29 @@ bool ReplicatorGroup::readonly(const PeerId& peer) const { return Replicator::readonly(rid); } +bool ReplicatorGroup::arbiter(const PeerId& peer) const { + std::map::const_iterator iter = _rmap.find(peer); + if (iter == _rmap.end()) { + return false; + } + ReplicatorId rid = iter->second.id; + return Replicator::arbiter(rid); +} + +int ReplicatorGroup::enter_degraded_mode() { + for (std::map::const_iterator + iter = _rmap.begin(); iter != _rmap.end(); ++iter) { + Replicator::enter_degraded_mode(iter->second.id); + } + return 0; +} + +int ReplicatorGroup::exit_degraded_mode() { + for (std::map::const_iterator + iter = _rmap.begin(); iter != _rmap.end(); ++iter) { + Replicator::exit_degraded_mode(iter->second.id); + } + return 0; +} + } // namespace braft diff --git a/src/braft/replicator.h b/src/braft/replicator.h index 7223900f..a4abc6bf 100644 --- a/src/braft/replicator.h +++ b/src/braft/replicator.h @@ -97,6 +97,10 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { static void wait_for_caught_up(ReplicatorId, int64_t max_margin, const timespec* due_time, CatchupClosure* done); + + // Check if the the margin between |last_log_index| from leader and the peer + // is less than |max_margin|. + static bool is_caughtup(ReplicatorId id, int64_t max_margin); // Tranfer leadership to the very peer if the replicated logs are over // |log_index| @@ -127,6 +131,12 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { // Check if a replicator is readonly static bool readonly(ReplicatorId id); + + // Check if a replicator is arbiter + static bool arbiter(ReplicatorId id); + + static int enter_degraded_mode(ReplicatorId id); + static int exit_degraded_mode(ReplicatorId id); private: enum St { @@ -205,6 +215,9 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { void _describe(std::ostream& os, bool use_html); void _get_status(PeerStatus* status); bool _is_catchup(int64_t max_margin) { + if (_arbiter) { + return true; + } // We should wait until install snapshot finish. If the process is throttled, // it maybe very slow. if (_next_index < _options.log_manager->first_log_index()) { @@ -258,6 +271,8 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { bthread_timer_t _heartbeat_timer; SnapshotReader* _reader; CatchupClosure *_catchup_closure; + bool _arbiter; + bool _degraded; }; struct ReplicatorGroupOptions { @@ -296,6 +311,9 @@ class ReplicatorGroup { // wait the very peer catchup int wait_caughtup(const PeerId& peer, int64_t max_margin, const timespec* due_time, CatchupClosure* done); + + // Check the very peer catchup + bool is_caughtup(const PeerId& peer, int64_t max_margin); int64_t last_rpc_send_timestamp(const PeerId& peer); @@ -358,6 +376,12 @@ class ReplicatorGroup { // Check if a replicator is in readonly bool readonly(const PeerId& peer) const; + // Check if a replicator is arbiter + bool arbiter(const PeerId& peer) const; + + int enter_degraded_mode(); + int exit_degraded_mode(); + private: int _add_replicator(const PeerId& peer, ReplicatorId *rid); diff --git a/src/braft/snapshot.cpp b/src/braft/snapshot.cpp index 8bf7d173..a5146e41 100644 --- a/src/braft/snapshot.cpp +++ b/src/braft/snapshot.cpp @@ -590,6 +590,13 @@ SnapshotCopier* LocalSnapshotStorage::start_to_copy_from(const std::string& uri) return copier; } +SnapshotCopier* LocalSnapshotStorage::start_to_copy_from(const SnapshotMeta& meta) { + VirtualSnapshotCopier* copier = new VirtualSnapshotCopier(meta); + copier->_storage = this; + copier->start(); + return copier; +} + int LocalSnapshotStorage::close(SnapshotCopier* copier) { delete copier; return 0; @@ -1031,4 +1038,57 @@ int LocalSnapshotCopier::init(const std::string& uri) { return _copier.init(uri, _fs, _throttle); } + +// Virtual SnapshotCopier + +VirtualSnapshotCopier::VirtualSnapshotCopier(const SnapshotMeta& meta) + : _tid(INVALID_BTHREAD) + , _meta(meta) + , _reader(NULL) + , _writer(NULL) + , _storage(NULL) {} + +VirtualSnapshotCopier::~VirtualSnapshotCopier() { + CHECK(!_writer); +} + +void VirtualSnapshotCopier::join() { + bthread_join(_tid, NULL); +} + +void VirtualSnapshotCopier::start() { + if (bthread_start_background( + &_tid, NULL, start_copy, this) != 0) { + PLOG(ERROR) << "Fail to start bthread"; + copy(); + } +} + +void *VirtualSnapshotCopier::start_copy(void* arg) { + VirtualSnapshotCopier* c = (VirtualSnapshotCopier*)arg; + c->copy(); + return NULL; +} + +void VirtualSnapshotCopier::copy() { + _writer = (LocalSnapshotWriter *)_storage->create(); + if (_writer == NULL) { + set_error(EIO, "Fail to create snapshot writer"); + return; + } + + _writer->save_meta(_meta); + if (_writer->sync() != 0) { + set_error(EIO, "Fail to sync snapshot writer"); + } + if (_storage->close(_writer) != 0 && ok()) { + set_error(EIO, "Fail to close writer"); + } + _writer = NULL; + + if (ok()) { + _reader = _storage->open(); + } +} + } // namespace braft diff --git a/src/braft/snapshot.h b/src/braft/snapshot.h index 448ff4cd..8f8ec0cc 100644 --- a/src/braft/snapshot.h +++ b/src/braft/snapshot.h @@ -194,6 +194,7 @@ friend class LocalSnapshotCopier; virtual int close(SnapshotReader* reader); virtual SnapshotReader* copy_from(const std::string& uri) WARN_UNUSED_RESULT; virtual SnapshotCopier* start_to_copy_from(const std::string& uri); + virtual SnapshotCopier* start_to_copy_from(const SnapshotMeta& meta); virtual int close(SnapshotCopier* copier); virtual int set_filter_before_copy_remote(); virtual int set_file_system_adaptor(FileSystemAdaptor* fs); @@ -221,6 +222,47 @@ friend class LocalSnapshotCopier; scoped_refptr _snapshot_throttle; }; +constexpr const char* kVirtualSnapshot = "VirtualSnapshot"; + +class VirtualSnapshotReader: public SnapshotReader { +public: + VirtualSnapshotReader(const SnapshotMeta& meta) : _meta(meta) {}; + virtual ~VirtualSnapshotReader() = default; + virtual int load_meta(SnapshotMeta* meta) { + *meta = _meta; + return 0; + } + // Get the path of the Snapshot + virtual std::string get_path() { return std::string(); } + // Generate uri for other peers to copy this snapshot. + // Return an empty string if some error has occcured + virtual std::string generate_uri_for_copy() { return kVirtualSnapshot; } + // List all the existing files in the Snapshot currently + virtual void list_files(std::vector *files) {} +private: + SnapshotMeta _meta; +}; + +class VirtualSnapshotCopier : public SnapshotCopier { +friend class LocalSnapshotStorage; +public: + VirtualSnapshotCopier(const SnapshotMeta& meta); + ~VirtualSnapshotCopier(); + virtual void cancel() {}; + virtual void join(); + virtual SnapshotReader* get_reader() { return _reader; } +private: + static void* start_copy(void* arg); + void start(); + void copy(); + + bthread_t _tid; + SnapshotMeta _meta; + SnapshotReader* _reader; + LocalSnapshotWriter* _writer; + LocalSnapshotStorage* _storage; +}; + } // namespace braft #endif //~BRAFT_RAFT_SNAPSHOT_H diff --git a/src/braft/snapshot_executor.cpp b/src/braft/snapshot_executor.cpp index 64403013..3ac47320 100644 --- a/src/braft/snapshot_executor.cpp +++ b/src/braft/snapshot_executor.cpp @@ -522,6 +522,11 @@ int SnapshotExecutor::register_downloading_snapshot(DownloadingSnapshot* ds) { ds->response->set_success(true); return -1; } + if (ds->request->uri() != kVirtualSnapshot && _node->arbiter()) { + LOG(WARNING) << "Register failed: arbiter node receive non virtual snapshot."; + ds->cntl->SetFailed(ESNAPSHOTNOTVIRTUAL, "Arbiter node recieve non virtual snapshot"); + return -1; + } ds->response->set_term(_term); if (_saving_snapshot) { LOG(WARNING) << "Register failed: is saving snapshot."; @@ -534,7 +539,13 @@ int SnapshotExecutor::register_downloading_snapshot(DownloadingSnapshot* ds) { _downloading_snapshot.store(ds, butil::memory_order_relaxed); // Now this session has the right to download the snapshot. CHECK(!_cur_copier); - _cur_copier = _snapshot_storage->start_to_copy_from(ds->request->uri()); + if (ds->request->uri() != kVirtualSnapshot) { + CHECK(!_node->arbiter()); + _cur_copier = _snapshot_storage->start_to_copy_from(ds->request->uri()); + } else { + CHECK(_node->arbiter()); + _cur_copier = _snapshot_storage->start_to_copy_from(ds->request->meta()); + } if (_cur_copier == NULL) { _downloading_snapshot.store(NULL, butil::memory_order_relaxed); lck.unlock(); diff --git a/src/braft/storage.h b/src/braft/storage.h index 21b5e0d3..02eff6db 100644 --- a/src/braft/storage.h +++ b/src/braft/storage.h @@ -336,6 +336,7 @@ class SnapshotStorage { // Copy snapshot from uri and open it as a SnapshotReader virtual SnapshotReader* copy_from(const std::string& uri) WARN_UNUSED_RESULT = 0; virtual SnapshotCopier* start_to_copy_from(const std::string& uri) = 0; + virtual SnapshotCopier* start_to_copy_from(const SnapshotMeta& meta) = 0; virtual int close(SnapshotCopier* copier) = 0; // Create an instance of this kind of SnapshotStorage with the parameters encoded diff --git a/test/test_node.cpp b/test/test_node.cpp index fc5f14bb..a83b2dc2 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -299,6 +299,405 @@ TEST_P(NodeTest, TripleNode) { cluster.stop_all(); } +TEST_P(NodeTest, ArbiterBasic) { + std::vector peers; + for (int i = 0; i < 3; i++) { + braft::PeerId peer; + peer.addr.ip = butil::my_ip(); + peer.addr.port = 5006 + i; + peer.idx = 0; + + peers.push_back(peer); + } + + // 1. start cluster + Cluster cluster("unittest", peers); + // peer[0] is arbiter + ASSERT_EQ(0, cluster.start(peers[0].addr, false, 30, nullptr, true)); + ASSERT_EQ(0, cluster.start(peers[1].addr)); + + // elect leader + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + // arbiter should not become leader + ASSERT_TRUE(leader->node_id().peer_id.addr == peers[1].addr); + + // 2. arbiter can't vote + braft::Node* arbiter = cluster.find_node(peers[0]); + butil::Status st = arbiter->vote(0); + ASSERT_EQ(st.error_code(), EPERM); + + // 3. transer leadership to arbiter should fail + leader->transfer_leadership_to(peers[0]); + cluster.wait_leader(); + ASSERT_EQ(cluster.leader()->node_id().peer_id, peers[1]); + + // 4. apply something (in degraded mode) + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello degraded: %d", i + 1); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + ASSERT_TRUE(leader->degraded()); + + // 5. stop arbiter + cluster.stop(peers[0].addr); + + // apply should failed + cond.reset(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello failed: %d", i + 1); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, -1); + leader->apply(task); + } + cond.wait(); + + // 6. restart arbiter + ASSERT_EQ(0, cluster.start(peers[0].addr, false, 30, nullptr, true)); + cluster.wait_leader(); + ASSERT_EQ(cluster.leader(), leader); + + // apply something + cond.reset(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // 7. stop cluster + LOG(WARNING) << "cluster stop"; + cluster.stop_all(); +} + +TEST_P(NodeTest, FollowerFailWithArbiter) { + std::vector peers; + for (int i = 0; i < 3; i++) { + braft::PeerId peer; + peer.addr.ip = butil::my_ip(); + peer.addr.port = 5006 + i; + peer.idx = 0; + + peers.push_back(peer); + } + + // 1. start cluster + Cluster cluster("unittest", peers); + // peer[0] is arbiter + ASSERT_EQ(0, cluster.start(peers[0].addr, false, 30, nullptr, true)); + ASSERT_EQ(0, cluster.start(peers[1].addr)); + ASSERT_EQ(0, cluster.start(peers[2].addr)); + + // elect leader + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + ASSERT_FALSE(leader->degraded()); + LOG(WARNING) << "leader is " << leader->node_id(); + + // 2. apply something (not in degraded mode) + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello normal: %d", i + 1); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + // wait apply and print some log + sleep(2); + + // arbiter should not commit any log + braft::Node* arbiter = cluster.find_node(peers[0]); + UserLog log; + butil::Status stat = arbiter->read_committed_user_log(1, &log); + ASSERT_EQ(stat.error_code(), ENOMOREUSERLOG); + + // 3. enter/exit degraded mode many times by stop follower + std::vector nodes; + cluster.followers(&nodes); + ASSERT_EQ(2, nodes.size()); + butil::EndPoint follower_addr = (nodes[0]->node_id().peer_id == peers[0])? + nodes[1]->node_id().peer_id.addr: + nodes[0]->node_id().peer_id.addr; + + for (int loop = 0; loop < 3; loop++) { + // stop leader + cluster.stop(follower_addr); + LOG(WARNING) << "stop follower: " << follower_addr << " loop " << loop; + + // apply something + cond.reset(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello degraded: %d, loop %d", i + 1, loop); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + ASSERT_TRUE(leader->degraded()) << " loop " << loop; + + // re start the follower and wait group exit degraded mode + cluster.start(follower_addr); + ASSERT_TRUE(cluster.wait_exit_degraded_mode(leader)) << " loop " << loop; + + // apply something + cond.reset(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello normal again: %d, loop %d", i + 1, loop); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + } + + cluster.ensure_same(); + + // 4. stop cluster + LOG(WARNING) << "cluster stop"; + cluster.stop_all(); +} + +TEST_P(NodeTest, LeaderFailWithArbiter) { + std::vector peers; + for (int i = 0; i < 3; i++) { + braft::PeerId peer; + peer.addr.ip = butil::my_ip(); + peer.addr.port = 5006 + i; + peer.idx = 0; + + peers.push_back(peer); + } + + // 1. start cluster + Cluster cluster("unittest", peers); + // peer[0] is arbiter + ASSERT_EQ(0, cluster.start(peers[0].addr, false, 30, nullptr, true)); + ASSERT_EQ(0, cluster.start(peers[1].addr)); + ASSERT_EQ(0, cluster.start(peers[2].addr)); + + // elect leader + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + ASSERT_FALSE(leader->degraded()); + LOG(WARNING) << "leader is " << leader->node_id(); + + + // 2. apply something (not in degraded mode) + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello normal: %d", i + 1); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // 3. stop leader + butil::EndPoint old_leader = leader->node_id().peer_id.addr; + LOG(WARNING) << "stop leader " << leader->node_id(); + cluster.stop(leader->node_id().peer_id.addr); + + // 4. apply something when follower + std::vector nodes; + cluster.followers(&nodes); + cond.reset(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "follower apply: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, -1); + nodes[i % nodes.size()]->apply(task); + } + cond.wait(); + + // elect new leader + cluster.wait_leader(); + leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + // arbiter should not become leader + ASSERT_NE(leader->node_id().peer_id.addr, peers[0].addr); + LOG(WARNING) << "elect new leader " << leader->node_id(); + + // 5. apply something + cond.reset(10); + for (int i = 10; i < 20; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + ASSERT_TRUE(leader->degraded()); + + // 6. old leader restart + ASSERT_EQ(0, cluster.start(old_leader)); + LOG(WARNING) << "restart old leader " << old_leader; + + // 7. apply something + cond.reset(10); + for (int i = 20; i < 30; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // 8. stop and clean old leader + LOG(WARNING) << "stop old leader " << old_leader; + cluster.stop(old_leader); + LOG(WARNING) << "clean old leader data " << old_leader; + cluster.clean(old_leader); + + sleep(2); + // 9. restart old leader + ASSERT_EQ(0, cluster.start(old_leader)); + LOG(WARNING) << "restart old leader " << old_leader; + + ASSERT_TRUE(cluster.wait_exit_degraded_mode(leader)); + cluster.ensure_same(); + + // 10. stop cluster + LOG(WARNING) << "cluster stop"; + cluster.stop_all(); +} + +TEST_P(NodeTest, ChangePeersWithArbiter) { + std::vector peers; + braft::PeerId peer0("127.0.0.1:5006"); + braft::PeerId peer1("127.0.0.1:5007"); + braft::PeerId peer2("127.0.0.1:5008"); + braft::PeerId peer3("127.0.0.1:5009"); + + // 1. start cluster + peers = { peer0, peer1, peer2 }; + Cluster cluster("unittest", peers); + ASSERT_EQ(0, cluster.start(peer0.addr, false, 30, nullptr, true)); + ASSERT_EQ(0, cluster.start(peer1.addr)); + ASSERT_EQ(0, cluster.start(peer2.addr)); + + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + ASSERT_FALSE(leader->degraded()); + LOG(WARNING) << "leader is " << leader->node_id(); + + // 2. apply something + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello normal: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // 3. stop follower + std::vector nodes; + cluster.followers(&nodes); + ASSERT_EQ(2, nodes.size()); + butil::EndPoint follower_addr = (nodes[0]->node_id().peer_id == peers[0])? + nodes[1]->node_id().peer_id.addr: + nodes[0]->node_id().peer_id.addr; + cluster.stop(follower_addr); + LOG(WARNING) << "stop follower: " << follower_addr; + + // 4. apply something + cond.reset(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello degraded: %d", i + 1); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + ASSERT_TRUE(leader->degraded()); + + // 5. start peer3 + ASSERT_EQ(0, cluster.start(peer3.addr)); + + // 6. change the stopped peer to peer3 + peers.erase(std::remove(peers.begin(), peers.end(), braft::PeerId(follower_addr)), peers.end()); + peers.push_back(peer3); + braft::SynchronizedClosure done; + leader->change_peers(braft::Configuration(peers), &done); + done.wait(); + ASSERT_TRUE(done.status().ok()); + + ASSERT_TRUE(cluster.wait_exit_degraded_mode(leader)); + cluster.ensure_same(); + + // 7. stop cluster + LOG(WARNING) << "cluster stop"; + cluster.stop_all(); +} + TEST_P(NodeTest, LeaderFail) { std::vector peers; for (int i = 0; i < 3; i++) { diff --git a/test/test_snapshot_executor.cpp b/test/test_snapshot_executor.cpp index 7d63bb0f..39ffd847 100644 --- a/test/test_snapshot_executor.cpp +++ b/test/test_snapshot_executor.cpp @@ -179,6 +179,11 @@ friend class MockSnapshotCopier; return copier; } + virtual SnapshotCopier* start_to_copy_from(const SnapshotMeta& meta) { + return NULL; + } + + virtual int close(SnapshotCopier* copier) { delete copier; return 0; diff --git a/test/util.h b/test/util.h index e6984920..7758652e 100644 --- a/test/util.h +++ b/test/util.h @@ -233,7 +233,7 @@ class Cluster { int start(const butil::EndPoint& listen_addr, bool empty_peers = false, int snapshot_interval_s = 30, - braft::Closure* leader_start_closure = NULL) { + braft::Closure* leader_start_closure = NULL, bool arbiter = false) { if (_server_map[listen_addr] == NULL) { brpc::Server* server = new brpc::Server(); if (braft::add_service(server, listen_addr) != 0 @@ -269,6 +269,8 @@ class Cluster { options.snapshot_throttle = &tst; options.catchup_margin = 2; + + options.arbiter = arbiter; braft::Node* node = new braft::Node(_name, braft::PeerId(listen_addr, 0)); int ret = node->init(options); @@ -380,6 +382,18 @@ class Cluster { } } + bool wait_exit_degraded_mode(braft::Node *node) { + while (true) { + if (!node->is_leader()) { + return false; + } else if (!node->degraded()) { + return true; + } else { + usleep(100 * 1000); + } + } + } + void check_node_status() { std::vector nodes; { @@ -424,10 +438,21 @@ class Cluster { LOG(INFO) << "_fsms.size()=" << _fsms.size(); int nround = 0; - MockFSM* first = _fsms[0]; + MockFSM* first = nullptr; + for (size_t i = 0; i < _fsms.size(); i++) { + if (!_nodes[i]->_impl->arbiter()) { + first = _fsms[i]; + } + } + if(first == nullptr) { + return false; + } CHECK: first->lock(); - for (size_t i = 1; i < _fsms.size(); i++) { + for (size_t i = 0; i < _fsms.size(); i++) { + if (_fsms[i] == first || _nodes[i]->_impl->arbiter()) { + continue; + } MockFSM* fsm = _fsms[i]; fsm->lock();