Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace the type of PeerId::addr from butil::EndPoint to string #429

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/braft/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ static butil::Status get_leader(const GroupId& group_id, const Configuration& co
for (Configuration::const_iterator
iter = conf.begin(); iter != conf.end(); ++iter) {
brpc::Channel channel;
if (channel.Init(iter->addr, NULL) != 0) {
if (channel.Init(iter->addr.c_str(), NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
iter->to_string().c_str());
}
Expand Down Expand Up @@ -73,7 +73,7 @@ butil::Status add_peer(const GroupId& group_id, const Configuration& conf,
butil::Status st = get_leader(group_id, conf, &leader_id);
BRAFT_RETURN_IF(!st.ok(), st);
brpc::Channel channel;
if (channel.Init(leader_id.addr, NULL) != 0) {
if (channel.Init(leader_id.addr.c_str(), NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
}
Expand All @@ -93,11 +93,11 @@ butil::Status add_peer(const GroupId& group_id, const Configuration& conf,
}
Configuration old_conf;
for (int i = 0; i < response.old_peers_size(); ++i) {
old_conf.add_peer(response.old_peers(i));
old_conf.add_peer(PeerId::from_string(response.old_peers(i)));
}
Configuration new_conf;
for (int i = 0; i < response.new_peers_size(); ++i) {
new_conf.add_peer(response.new_peers(i));
new_conf.add_peer(PeerId::from_string(response.new_peers(i)));
}
LOG(INFO) << "Configuration of replication group `" << group_id
<< "' changed from " << old_conf
Expand All @@ -111,7 +111,7 @@ butil::Status remove_peer(const GroupId& group_id, const Configuration& conf,
butil::Status st = get_leader(group_id, conf, &leader_id);
BRAFT_RETURN_IF(!st.ok(), st);
brpc::Channel channel;
if (channel.Init(leader_id.addr, NULL) != 0) {
if (channel.Init(leader_id.addr.c_str(), NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
}
Expand All @@ -131,11 +131,11 @@ butil::Status remove_peer(const GroupId& group_id, const Configuration& conf,
}
Configuration old_conf;
for (int i = 0; i < response.old_peers_size(); ++i) {
old_conf.add_peer(response.old_peers(i));
old_conf.add_peer(PeerId::from_string(response.old_peers(i)));
}
Configuration new_conf;
for (int i = 0; i < response.new_peers_size(); ++i) {
new_conf.add_peer(response.new_peers(i));
new_conf.add_peer(PeerId::from_string(response.new_peers(i)));
}
LOG(INFO) << "Configuration of replication group `" << group_id
<< "' changed from " << old_conf
Expand All @@ -150,7 +150,7 @@ butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id,
return butil::Status(EINVAL, "new_conf is empty");
}
brpc::Channel channel;
if (channel.Init(peer_id.addr, NULL) != 0) {
if (channel.Init(peer_id.addr.c_str(), NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peer_id.to_string().c_str());
}
Expand All @@ -176,7 +176,7 @@ butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id,
butil::Status snapshot(const GroupId& group_id, const PeerId& peer_id,
const CliOptions& options) {
brpc::Channel channel;
if (channel.Init(peer_id.addr, NULL) != 0) {
if (channel.Init(peer_id.addr.c_str(), NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peer_id.to_string().c_str());
}
Expand Down Expand Up @@ -204,7 +204,7 @@ butil::Status change_peers(const GroupId& group_id, const Configuration& conf,
LOG(INFO) << "conf=" << conf << " leader=" << leader_id
<< " new_peers=" << new_peers;
brpc::Channel channel;
if (channel.Init(leader_id.addr, NULL) != 0) {
if (channel.Init(leader_id.addr.c_str(), NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
}
Expand All @@ -228,11 +228,11 @@ butil::Status change_peers(const GroupId& group_id, const Configuration& conf,
}
Configuration old_conf;
for (int i = 0; i < response.old_peers_size(); ++i) {
old_conf.add_peer(response.old_peers(i));
old_conf.add_peer(PeerId::from_string(response.old_peers(i)));
}
Configuration new_conf;
for (int i = 0; i < response.new_peers_size(); ++i) {
new_conf.add_peer(response.new_peers(i));
new_conf.add_peer(PeerId::from_string(response.new_peers(i)));
}
LOG(INFO) << "Configuration of replication group `" << group_id
<< "' changed from " << old_conf
Expand All @@ -250,7 +250,7 @@ butil::Status transfer_leader(const GroupId& group_id, const Configuration& conf
return butil::Status::OK();
}
brpc::Channel channel;
if (channel.Init(leader_id.addr, NULL) != 0) {
if (channel.Init(leader_id.addr.c_str(), NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
}
Expand Down
6 changes: 3 additions & 3 deletions src/braft/cli_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static void add_peer_returned(brpc::Controller* cntl,
for (size_t i = 0; i < old_peers.size(); ++i) {
response->add_old_peers(old_peers[i].to_string());
response->add_new_peers(old_peers[i].to_string());
if (old_peers[i] == request->peer_id()) {
if (old_peers[i] == PeerId::from_string(request->peer_id())) {
already_exists = true;
}
}
Expand Down Expand Up @@ -94,7 +94,7 @@ static void remove_peer_returned(brpc::Controller* cntl,
}
for (size_t i = 0; i < old_peers.size(); ++i) {
response->add_old_peers(old_peers[i].to_string());
if (old_peers[i] != request->peer_id()) {
if (old_peers[i] != PeerId::from_string(request->peer_id())) {
response->add_new_peers(old_peers[i].to_string());
}
}
Expand Down Expand Up @@ -233,7 +233,7 @@ butil::Status CliServiceImpl::get_node(scoped_refptr<NodeImpl>* node,
const GroupId& group_id,
const std::string& peer_id) {
if (!peer_id.empty()) {
*node = global_node_manager->get(group_id, peer_id);
*node = global_node_manager->get(group_id, PeerId::from_string(peer_id));
if (!(*node)) {
return butil::Status(ENOENT, "Fail to find node %s in group %s",
peer_id.c_str(),
Expand Down
44 changes: 24 additions & 20 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <ostream>
#include <vector>
#include <set>
#include <map>
#include "butil/string_printf.h"
#include <butil/strings/string_piece.h>
#include <butil/endpoint.h>
#include <butil/logging.h>
Expand All @@ -41,41 +41,41 @@ enum Role {

// Represent a participant in a replicating group.
struct PeerId {
butil::EndPoint addr; // ip+port.
int idx; // idx in same addr, default 0
std::string addr; // ip+port.
int idx = 0; // idx in same addr, default 0
Role role = REPLICA;

PeerId() : idx(0), role(REPLICA) {}
explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0), role(REPLICA) {}
PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_), role(REPLICA) {}
PeerId(butil::EndPoint addr_, int idx_, bool witness) : addr(addr_), idx(idx_) {

PeerId(std::string addr_, int idx_) : addr(addr_), idx(idx_), role(REPLICA) {}
PeerId(std::string addr_, int idx_, bool witness) : addr(addr_), idx(idx_) {
if (witness) {
this->role = WITNESS;
}
}

/*intended implicit*/PeerId(const std::string& str)
{ CHECK_EQ(0, parse(str)); }
PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role) {}

void reset() {
addr.ip = butil::IP_ANY;
addr.port = 0;
addr.clear();
idx = 0;
role = REPLICA;
}

bool is_empty() const {
return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0);
return (addr.empty() && idx == 0);
}

bool is_witness() const {
return role == WITNESS;
}

int parse(const std::string& str) {
reset();
char ip_str[64];
char ip_str[1024];
int32_t port;
int value = REPLICA;
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, &addr.port, &idx, &value)) {
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, &port, &idx, &value)) {
reset();
return -1;
}
Expand All @@ -84,20 +84,24 @@ struct PeerId {
reset();
return -1;
}
if (0 != butil::str2ip(ip_str, &addr.ip)) {
reset();
return -1;
}

addr = butil::string_printf("%s:%d", ip_str, port);

return 0;
}

std::string to_string() const {
char str[128];
snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role));
return std::string(str);
return butil::string_printf("%s:%d:%d", addr.c_str(), idx, int(role));
}

PeerId& operator=(const PeerId& rhs) = default;

static PeerId from_string(const std::string&s) {
PeerId p;
CHECK_EQ(0,p.parse(s));

return p;
}
};

inline bool operator<(const PeerId& id1, const PeerId& id2) {
Expand Down
4 changes: 2 additions & 2 deletions src/braft/file_system_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ ssize_t BufferedSequentialWriteFileAdaptor::write(const butil::IOBuf& data, off_
BRAFT_VLOG << "begin write offset " << offset << ", data_size " << data.size()
<< ", buffer_offset " << _buffer_offset
<< ", buffer_size " << _buffer_size;
if (offset < _buffer_offset + _buffer_size) {
if (offset < _buffer_offset + static_cast<off_t>(_buffer_size)) {
LOG(WARNING) << "Fail to write into buffered file adaptor with invalid range"
<< ", offset: " << offset
<< ", data_size: " << data.size()
<< ", buffer_offset: " << _buffer_offset
<< ", buffer_size: " << _buffer_size;
errno = EINVAL;
return -1;
} else if (offset > _buffer_offset + _buffer_size) {
} else if (offset > _buffer_offset + static_cast<off_t>(_buffer_size)) {
// passby hole
CHECK(_buffer_size == 0);
BRAFT_VLOG << "seek to new offset " << offset << " as there is hole";
Expand Down
4 changes: 2 additions & 2 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
}
int64_t max_committed_index = -1;
int64_t counter = 0;
size_t batch_size = FLAGS_raft_fsm_caller_commit_batch;
int64_t batch_size = FLAGS_raft_fsm_caller_commit_batch;
for (; iter; ++iter) {
if (iter->type == COMMITTED && counter < batch_size) {
if (iter->committed_index > max_committed_index) {
Expand Down Expand Up @@ -417,7 +417,7 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) {
// Joint stage is not supposed to be noticeable by end users.
Configuration conf;
for (int i = 0; i < meta.peers_size(); ++i) {
conf.add_peer(meta.peers(i));
conf.add_peer(PeerId::from_string(meta.peers(i)));
}
_fsm->on_configuration_committed(conf, meta.last_included_index());
}
Expand Down
4 changes: 2 additions & 2 deletions src/braft/log_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ butil::Status parse_configuration_meta(const butil::IOBuf& data, LogEntry* entry
}
entry->peers = new std::vector<PeerId>;
for (int j = 0; j < meta.peers_size(); ++j) {
entry->peers->push_back(PeerId(meta.peers(j)));
entry->peers->push_back(PeerId::from_string(meta.peers(j)));
}
if (meta.old_peers_size() > 0) {
entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < meta.old_peers_size(); i++) {
entry->old_peers->push_back(PeerId(meta.old_peers(i)));
entry->old_peers->push_back(PeerId::from_string(meta.old_peers(i)));
}
}
return status;
Expand Down
4 changes: 2 additions & 2 deletions src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,11 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
}
Configuration conf;
for (int i = 0; i < meta->peers_size(); ++i) {
conf.add_peer(meta->peers(i));
conf.add_peer(PeerId::from_string(meta->peers(i)));
}
Configuration old_conf;
for (int i = 0; i < meta->old_peers_size(); ++i) {
old_conf.add_peer(meta->old_peers(i));
old_conf.add_peer(PeerId::from_string(meta->old_peers(i)));
}
ConfigurationEntry entry;
entry.id = LogId(meta->last_included_index(), meta->last_included_term());
Expand Down
14 changes: 7 additions & 7 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ int NodeImpl::init(const NodeOptions& options) {
_options = options;

// check _server_id
if (butil::IP_ANY == _server_id.addr.ip) {
if ( _server_id.addr.empty()) {
LOG(ERROR) << "Group " << _group_id
<< " Node can't started from IP_ANY";
return -1;
Expand Down Expand Up @@ -972,8 +972,8 @@ void NodeImpl::snapshot(Closure* done) {
}

void NodeImpl::do_snapshot(Closure* done) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " starts to do snapshot";
VLOG(1) << "node " << _group_id << ":" << _server_id
<< " starts to do snapshot";
if (_snapshot_executor) {
_snapshot_executor->do_snapshot(done);
} else {
Expand Down Expand Up @@ -1654,7 +1654,7 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {
options.max_retry = 0;
options.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms;
brpc::Channel channel;
if (0 != channel.Init(iter->addr, &options)) {
if (0 != channel.Init(iter->addr.c_str(), &options)) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " channel init failed, addr " << iter->addr;
continue;
Expand Down Expand Up @@ -1760,7 +1760,7 @@ void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
options.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms;
options.max_retry = 0;
brpc::Channel channel;
if (0 != channel.Init(iter->addr, &options)) {
if (0 != channel.Init(iter->addr.c_str(), &options)) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " channel init failed, addr " << iter->addr;
continue;
Expand Down Expand Up @@ -2539,13 +2539,13 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
if (entry.peers_size() > 0) {
log_entry->peers = new std::vector<PeerId>;
for (int i = 0; i < entry.peers_size(); i++) {
log_entry->peers->push_back(entry.peers(i));
log_entry->peers->push_back(PeerId::from_string(entry.peers(i)));
}
CHECK_EQ(log_entry->type, ENTRY_TYPE_CONFIGURATION);
if (entry.old_peers_size() > 0) {
log_entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < entry.old_peers_size(); i++) {
log_entry->old_peers->push_back(entry.old_peers(i));
log_entry->old_peers->push_back(PeerId::from_string(entry.old_peers(i)));
}
}
} else {
Expand Down
11 changes: 5 additions & 6 deletions src/braft/node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,23 @@ NodeManager::NodeManager() {}

NodeManager::~NodeManager() {}

bool NodeManager::server_exists(butil::EndPoint addr) {
bool NodeManager::server_exists(const std::string& addr) {
BAIDU_SCOPED_LOCK(_mutex);
if (addr.ip != butil::IP_ANY) {
butil::EndPoint any_addr(butil::IP_ANY, addr.port);
if (_addr_set.find(any_addr) != _addr_set.end()) {
if (addr!= std::string()) {
if (_addr_set.find(addr) != _addr_set.end()) {
return true;
}
}
return _addr_set.find(addr) != _addr_set.end();
}

void NodeManager::remove_address(butil::EndPoint addr) {
void NodeManager::remove_address(const std::string& addr) {
BAIDU_SCOPED_LOCK(_mutex);
_addr_set.erase(addr);
}

int NodeManager::add_service(brpc::Server* server,
const butil::EndPoint& listen_address) {
const std::string& listen_address) {
if (server == NULL) {
LOG(ERROR) << "server is NULL";
return -1;
Expand Down
Loading