Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support arbiter in a new way #404

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/braft/errno.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ enum RaftError {
ENOMOREUSERLOG = 10015;
// Raft node in readonly mode
EREADONLY = 10016;
// Raft group not in degraded mode
ENOTDEGRADED = 10017;
// Arbiter node receives non virtual snapshot
ESNAPSHOTNOTVIRTUAL = 10018;
};

19 changes: 15 additions & 4 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,13 @@ void FSMCaller::do_committed(int64_t committed_index) {
continue;
}
Iterator iter(&iter_impl);
_fsm->on_apply(iter);
if (!_node->arbiter()) {
_fsm->on_apply(iter);
} else {
while(iter.valid()) {
iter.next();
}
}
LOG_IF(ERROR, iter.valid())
<< "Node " << _node->node_id()
<< " Iterator is still valid, did you return before iterator "
Expand Down Expand Up @@ -352,8 +358,11 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) {
done->Run();
return;
}

_fsm->on_snapshot_save(writer, done);
if (!_node->arbiter()) {
_fsm->on_snapshot_save(writer, done);
} else {
done->Run();
}
return;
}

Expand Down Expand Up @@ -402,7 +411,9 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) {
return done->Run();
}

ret = _fsm->on_snapshot_load(reader);
if (!_node->arbiter()) {
ret = _fsm->on_snapshot_load(reader);
}
if (ret != 0) {
done->status().set_error(ret, "StateMachine on_snapshot_load failed");
done->Run();
Expand Down
27 changes: 27 additions & 0 deletions src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <brpc/reloadable_flags.h> // BRPC_VALIDATE_GFLAG
#include "braft/storage.h" // LogStorage
#include "braft/fsm_caller.h" // FSMCaller
#include "braft/snapshot.h" // VirtualSnapshotReader

namespace braft {

Expand Down Expand Up @@ -967,4 +968,30 @@ butil::Status LogManager::check_consistency() {
return butil::Status(-1, "Impossible condition");
}

SnapshotReader* LogManager::get_virtual_snapshot() {
BAIDU_SCOPED_LOCK(_mutex);
LogId last_id;
if (_last_log_index >= _first_log_index) {
last_id = LogId(_last_log_index, unsafe_get_term(_last_log_index));
} else {
last_id = _last_snapshot_id;
}
ConfigurationEntry conf_entry;
_config_manager->get(last_id.index, &conf_entry);
SnapshotMeta meta;
meta.set_last_included_index(last_id.index);
meta.set_last_included_term(last_id.term);
for (Configuration::const_iterator
iter = conf_entry.conf.begin();
iter != conf_entry.conf.end(); ++iter) {
*meta.add_peers() = iter->to_string();
}
for (Configuration::const_iterator
iter = conf_entry.old_conf.begin();
iter != conf_entry.old_conf.end(); ++iter) {
*meta.add_old_peers() = iter->to_string();
}
return new VirtualSnapshotReader(meta);
}

} // namespace braft
2 changes: 2 additions & 0 deletions src/braft/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class BAIDU_CACHELINE_ALIGNMENT LogManager {
// Get the internal status of LogManager.
void get_status(LogManagerStatus* status);

SnapshotReader* get_virtual_snapshot();

private:
friend class AppendBatcher;
struct WaitMeta {
Expand Down
140 changes: 131 additions & 9 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ NodeImpl::NodeImpl(const GroupId& group_id, const PeerId& peer_id)
, _append_entries_cache(NULL)
, _append_entries_cache_version(0)
, _node_readonly(false)
, _majority_nodes_readonly(false) {
, _majority_nodes_readonly(false)
, _degraded(false) {
butil::string_printf(&_v_group_id, "%s_%d", _group_id.c_str(), _server_id.idx);
AddRef();
g_num_nodes << 1;
Expand Down Expand Up @@ -606,6 +607,7 @@ int NodeImpl::init(const NodeOptions& options) {
_state = STATE_FOLLOWER;

LOG(INFO) << "node " << _group_id << ":" << _server_id << " init,"
<< " arbiter: " << arbiter()
<< " term: " << _current_term
<< " last_log_id: " << _log_manager->last_log_id()
<< " conf: " << _conf.conf
Expand Down Expand Up @@ -771,25 +773,26 @@ void NodeImpl::on_caughtup(const PeerId& peer, int64_t term,
_conf_ctx.on_caughtup(version, peer, false);
}

void NodeImpl::check_dead_nodes(const Configuration& conf, int64_t now_ms) {
void NodeImpl::check_dead_nodes(const Configuration& conf, int64_t now_ms,
Configuration &alive_nodes) {
std::vector<PeerId> peers;
conf.list_peers(&peers);
size_t alive_count = 0;
alive_nodes.reset();
Configuration dead_nodes; // for easily print
for (size_t i = 0; i < peers.size(); i++) {
if (peers[i] == _server_id) {
++alive_count;
alive_nodes.add_peer(peers[i]);
continue;
}

if (now_ms - _replicator_group.last_rpc_send_timestamp(peers[i])
<= _options.election_timeout_ms) {
++alive_count;
alive_nodes.add_peer(peers[i]);
continue;
}
dead_nodes.add_peer(peers[i]);
}
if (alive_count >= peers.size() / 2 + 1) {
if (alive_nodes.size() >= peers.size() / 2 + 1) {
return;
}
LOG(WARNING) << "node " << node_id()
Expand All @@ -802,6 +805,71 @@ void NodeImpl::check_dead_nodes(const Configuration& conf, int64_t now_ms) {
step_down(_current_term, false, status);
}

bool NodeImpl::check_degraded(const Configuration &conf,
const Configuration &alive_nodes) {
std::vector<PeerId> peers;
alive_nodes.list_peers(&peers);
std::vector<PeerId> alive_non_arbiters;
for (size_t i = 0; i < peers.size(); i++) {
if (peers[i] == _server_id || !_replicator_group.arbiter(peers[i])) {
alive_non_arbiters.push_back(peers[i]);
}
}

if (alive_non_arbiters.size() < conf.size() / 2 + 1) {
LOG(WARNING) << "node " << node_id()
<< " term " << _current_term
<< " alive non-arbiter node don't satisfy quorm"
<< " for conf: " << conf
<< " alive nodes: " << alive_nodes
<< " alive non-arbiter nodes: " << Configuration(alive_non_arbiters);
return true;
}

// not in degraded mode, skip further check
if (!_degraded) {
return false;
}

// exit degraded mode if majority already caught up
Configuration caughtup_nodes;
for (size_t i = 0 ; i < alive_non_arbiters.size(); i++) {
if (alive_non_arbiters[i] == _server_id ||
_replicator_group.is_caughtup(alive_non_arbiters[i], _options.catchup_margin)) {
caughtup_nodes.add_peer(alive_non_arbiters[i]);
}
}
LOG(WARNING) << "node " << node_id()
<< " term " << _current_term
<< " in degraded mode and alive non-arbiter node satisfy quorm"
<< " for conf: " << conf
<< " alive nodes: " << alive_nodes
<< " alive non-arbiter nodes: " << Configuration(alive_non_arbiters)
<< " alive non-arbiter caught up nodes: " << caughtup_nodes;
return caughtup_nodes.size() < conf.size() / 2 + 1;
}

void NodeImpl::enter_degraded_mode() {
if (!_degraded) {
LOG(WARNING) << "node " << node_id()
<< " term " << _current_term
<< " enter degraded mode"
<< " last_committed_index " << _ballot_box->last_committed_index();
_degraded = true;
_replicator_group.enter_degraded_mode();
}
}

void NodeImpl::exit_degraded_mode() {
if (_degraded) {
LOG(WARNING) << "node " << node_id()
<< " term " << _current_term
<< " exit degraded mode";
_degraded = false;
_replicator_group.exit_degraded_mode();
}
}

void NodeImpl::handle_stepdown_timeout() {
BAIDU_SCOPED_LOCK(_mutex);

Expand All @@ -814,9 +882,25 @@ void NodeImpl::handle_stepdown_timeout() {
}

int64_t now = butil::monotonic_time_ms();
check_dead_nodes(_conf.conf, now);
Configuration alive_nodes, old_alive_nodes;
check_dead_nodes(_conf.conf, now, alive_nodes);
if (!_conf.old_conf.empty()) {
check_dead_nodes(_conf.old_conf, now, old_alive_nodes);
}

if (_state != STATE_LEADER) {
return;
}

bool degraded = check_degraded(_conf.conf, alive_nodes);
if (!_conf.old_conf.empty()) {
check_dead_nodes(_conf.old_conf, now);
degraded |= check_degraded(_conf.old_conf, old_alive_nodes);
}

if (!degraded) {
exit_degraded_mode();
} else {
enter_degraded_mode();
}
}

Expand Down Expand Up @@ -1053,6 +1137,12 @@ void NodeImpl::handle_election_timeout() {
_leader_id.to_string().c_str());
reset_leader_id(empty_id, status);

if (arbiter()) {
BRAFT_VLOG << "arbiter node " << _group_id << ":" << _server_id
<< " lost connection from leader";
return;
}

return pre_vote(&lck, triggered);
// Don't touch any thing of *this ever after
}
Expand Down Expand Up @@ -1090,6 +1180,16 @@ void NodeImpl::handle_timeout_now_request(brpc::Controller* controller,
<< state2str(saved_state) << " at term=" << saved_term;
return;
}
if (arbiter()) {
const int64_t saved_term = _current_term;
response->set_term(_current_term);
response->set_success(false);
lck.unlock();
LOG(WARNING) << "arbiter node " << _group_id << ":" << _server_id
<< " received handle_timeout_now_request "
<< " at term=" << saved_term;
return;
}
const butil::EndPoint remote_side = controller->remote_side();
const int64_t saved_term = _current_term;
if (FLAGS_raft_enable_leader_lease) {
Expand Down Expand Up @@ -1201,6 +1301,12 @@ int NodeImpl::transfer_leadership_to(const PeerId& peer) {
<< " which doesn't belong to " << _conf.conf;
return EINVAL;
}
if (_replicator_group.arbiter(peer_id)) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " refused to transfer leadership to peer "
<< peer_id << " which is arbiter";
return EINVAL;
}
const int64_t last_log_index = _log_manager->last_log_index();
const int rc = _replicator_group.transfer_leadership_to(peer_id, last_log_index);
if (rc != 0) {
Expand All @@ -1211,6 +1317,9 @@ int NodeImpl::transfer_leadership_to(const PeerId& peer) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " fail to transfer leadership, peer=" << peer_id
<< " whose consecutive_error_times not 0.";
} else if (rc == ENODATA) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " fail to transfer leadership, peer " << peer_id << " is arbiter";
} else {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " fail to transfer leadership, peer=" << peer_id
Expand Down Expand Up @@ -1243,6 +1352,9 @@ butil::Status NodeImpl::vote(int election_timeout_ms) {
if (_state != STATE_FOLLOWER) {
return butil::Status(EPERM, "is not follower");
}
if (arbiter()) {
return butil::Status(EPERM, "is arbiter");
}
int max_election_timeout_ms = _options.max_clock_drift_ms + _options.election_timeout_ms;
if (election_timeout_ms > max_election_timeout_ms) {
return butil::Status(EINVAL, "election_timeout_ms larger than safety threshold");
Expand Down Expand Up @@ -1789,6 +1901,7 @@ void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate,
// _conf_ctx.reset() will stop replicators of catching up nodes
_conf_ctx.reset();
_majority_nodes_readonly = false;
_degraded = false;

clear_append_entries_cache();

Expand Down Expand Up @@ -2478,6 +2591,7 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
response->set_term(_current_term);
response->set_last_log_index(_log_manager->last_log_index());
response->set_readonly(_node_readonly);
response->set_arbiter(arbiter());
lck.unlock();
// see the comments at FollowerStableClosure::run()
_ballot_box->set_last_committed_index(
Expand Down Expand Up @@ -2701,10 +2815,12 @@ void NodeImpl::describe(std::ostream& os, bool use_html) {
_replicator_group.list_replicators(&replicators);
const int64_t leader_timestamp = _follower_lease.last_leader_timestamp();
const bool readonly = (_node_readonly || _majority_nodes_readonly);
const bool degraded = _degraded;
lck.unlock();
const char *newline = use_html ? "<br>" : "\r\n";
os << "peer_id: " << _server_id << newline;
os << "state: " << state2str(st) << newline;
os << "arbiter: " << arbiter() << newline;
os << "readonly: " << readonly << newline;
os << "term: " << term << newline;
os << "conf_index: " << conf_index << newline;
Expand All @@ -2725,7 +2841,8 @@ void NodeImpl::describe(std::ostream& os, bool use_html) {
// info of configuration change
if (st == STATE_LEADER) {
os << "changing_conf: " << is_changing_conf
<< " stage: " << conf_statge << newline;
<< " stage: " << conf_statge << newline
<< "degraded: " << degraded << newline;
}
if (!new_peers.empty()) {
os << "new_peers:";
Expand Down Expand Up @@ -3578,6 +3695,11 @@ int64_t NodeImpl::last_leader_active_timestamp() {
return timestamp;
}

bool NodeImpl::degraded() {
BAIDU_SCOPED_LOCK(_mutex);
return _degraded;
}

struct LastActiveTimestampCompare {
bool operator()(const int64_t& a, const int64_t& b) {
return a > b;
Expand Down
Loading