Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft learner support. #386

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/braft/ballot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ int Ballot::init(const Configuration& conf, const Configuration* old_conf) {
_peers.reserve(conf.size());
for (Configuration::const_iterator
iter = conf.begin(); iter != conf.end(); ++iter) {
_peers.push_back(*iter);
if (!iter->learner) {
_peers.push_back(*iter);
}
}
_quorum = _peers.size() / 2 + 1;
if (!old_conf) {
Expand All @@ -39,7 +41,9 @@ int Ballot::init(const Configuration& conf, const Configuration* old_conf) {
_old_peers.reserve(old_conf->size());
for (Configuration::const_iterator
iter = old_conf->begin(); iter != old_conf->end(); ++iter) {
_old_peers.push_back(*iter);
if (!iter->learner) {
_old_peers.push_back(*iter);
}
}
_old_quorum = _old_peers.size() / 2 + 1;
return 0;
Expand Down
3 changes: 3 additions & 0 deletions src/braft/ballot_box.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ int BallotBox::init(const BallotBoxOptions &options) {

int BallotBox::commit_at(
int64_t first_log_index, int64_t last_log_index, const PeerId& peer) {
if (peer.learner) {
return 0;
}
// FIXME(chenzhangyi01): The cricital section is unacceptable because it
// blocks all the other Replicators and LogManagers
std::unique_lock<raft_mutex_t> lck(_mutex);
Expand Down
16 changes: 12 additions & 4 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ typedef std::string VersionedGroupId;
struct PeerId {
butil::EndPoint addr; // ip+port.
int idx; // idx in same addr, default 0
bool learner;

PeerId() : idx(0) {}
explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0) {}
PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_) {}
PeerId(butil::EndPoint addr_, int idx_, bool learner_=false) : addr(addr_), idx(idx_), learner(learner_) {}
PeerId() : PeerId(butil::EndPoint(), 0) {}
explicit PeerId(butil::EndPoint addr_) : PeerId(addr_, 0) {}
/*intended implicit*/PeerId(const std::string& str)
{ CHECK_EQ(0, parse(str)); }
PeerId(const PeerId& id) : addr(id.addr), idx(id.idx) {}
PeerId(const PeerId& id) : PeerId(id.addr, id.idx, id.learner) {}

void reset() {
addr.ip = butil::IP_ANY;
addr.port = 0;
idx = 0;
learner = false;
}

bool is_empty() const {
Expand All @@ -59,6 +61,9 @@ struct PeerId {
int parse(const std::string& str) {
reset();
char ip_str[64];
if (!str.empty() && str.back() == 'l') {
learner = true;
}
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d", ip_str, &addr.port, &idx)) {
reset();
return -1;
Expand All @@ -85,6 +90,9 @@ inline bool operator<(const PeerId& id1, const PeerId& id2) {
}
}

// intentionally leave behind `learner` field.
// So when we change a peer from learner to normal or normal to learner
// raft will do nothing
inline bool operator==(const PeerId& id1, const PeerId& id2) {
return (id1.addr == id2.addr && id1.idx == id2.idx);
}
Expand Down
40 changes: 28 additions & 12 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,11 @@ int NodeImpl::init(const NodeOptions& options) {
return -1;
}

CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms));
CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms));
if (!options.learner) {
CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms));
CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms));
}
CHECK_EQ(0, _snapshot_timer.init(this, options.snapshot_interval_s * 1000));

_config_manager = new ConfigurationManager();
Expand Down Expand Up @@ -633,7 +635,7 @@ int NodeImpl::init(const NodeOptions& options) {
// conditions
std::unique_lock<raft_mutex_t> lck(_mutex);
if (_conf.stable() && _conf.conf.size() == 1u
&& _conf.conf.contains(_server_id)) {
&& _conf.conf.contains(_server_id) && !options.learner) {
// The group contains only this server which must be the LEADER, trigger
// the timer immediately.
elect_self(&lck);
Expand Down Expand Up @@ -1189,18 +1191,32 @@ int NodeImpl::transfer_leadership_to(const PeerId& peer) {
if (_replicator_group.find_the_next_candidate(&peer_id, _conf) != 0) {
return -1;
}
} else {
bool found = false;
for (Configuration::const_iterator iter = _conf.begin(); iter != _conf.end(); ++iter) {
if (*iter == peer_id) {
if (iter->learner) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " refused to transfer leadership to peer " << peer_id
<< " which is a learner";
return EINVAL;
}
found = true;
break;
}
}
if (!found) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " refused to transfer leadership to peer " << peer_id
<< " which doesn't belong to " << _conf.conf;
return EINVAL;
}
}
if (peer_id == _server_id) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " transfering leadership to self";
return 0;
}
if (!_conf.contains(peer_id)) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " refused to transfer leadership to peer " << peer_id
<< " which doesn't belong to " << _conf.conf;
return EINVAL;
}
const int64_t last_log_index = _log_manager->last_log_index();
const int rc = _replicator_group.transfer_leadership_to(peer_id, last_log_index);
if (rc != 0) {
Expand Down Expand Up @@ -1609,7 +1625,7 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {

for (std::set<PeerId>::const_iterator
iter = peers.begin(); iter != peers.end(); ++iter) {
if (*iter == _server_id) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

计算 last_leader_active_timestamp、check_dead_nodes、check_majority_nodes_readonly 的时候也应该过滤掉 learner 吧

if (*iter == _server_id || iter->learner) {
continue;
}
brpc::ChannelOptions options;
Expand Down Expand Up @@ -1715,7 +1731,7 @@ void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
const DisruptedLeader& disrupted_leader) {
for (std::set<PeerId>::const_iterator
iter = peers.begin(); iter != peers.end(); ++iter) {
if (*iter == _server_id) {
if (*iter == _server_id || iter->learner) {
continue;
}
brpc::ChannelOptions options;
Expand Down
6 changes: 6 additions & 0 deletions src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ struct NodeOptions {
// Default: false
bool disable_cli;

// If true, this node will neither participate in elections nor be inclued in quorum
// learner will not trigger election_timeout or vote
// Default: false
bool learner;

// Construct a default instance
NodeOptions();

Expand All @@ -609,6 +614,7 @@ inline NodeOptions::NodeOptions()
, snapshot_file_system_adaptor(NULL)
, snapshot_throttle(NULL)
, disable_cli(false)
, learner(false)
{}

inline int NodeOptions::get_catchup_timeout_ms() {
Expand Down
2 changes: 1 addition & 1 deletion src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ int ReplicatorGroup::find_the_next_candidate(
int64_t max_index = 0;
for (std::map<PeerId, ReplicatorIdAndStatus>::const_iterator
iter = _rmap.begin(); iter != _rmap.end(); ++iter) {
if (!conf.contains(iter->first)) {
if (!conf.contains(iter->first) || iter->first.learner) {
continue;
}
const int64_t next_index = Replicator::get_next_index(iter->second.id);
Expand Down