Skip to content

Commit

Permalink
refactor braft to support hostname as PeerId's endpoint
Browse files Browse the repository at this point in the history
* define a struct named HostNameAddr, as an attribute of PeerId
* wrap a function to build naming_service_url, which is used for brpc channel init
* modify logic about channel init with naming service
  • Loading branch information
xiaolin310 committed Nov 24, 2023
1 parent bb90c51 commit 0db90ba
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ CTestTestfile.cmake

# Github aciton
!.github
cmake-build-debug
12 changes: 12 additions & 0 deletions example/counter/counter_hostname_test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cmake_minimum_required(VERSION 2.8.10)
project(counter C CXX)

option(BRPC_WITH_GLOG "With glog" OFF)
option(EXAMPLE_LINK_SO "Whether examples are linked dynamically" OFF)
option(LINK_TCMALLOC "Link tcmalloc if possible" ON)

Expand All @@ -19,6 +20,16 @@ if (NOT PROTOBUF_PROTOC_EXECUTABLE)
set (PROTOBUF_PROTOC_EXECUTABLE "${PROTO_LIB_DIR}/../bin/protoc")
endif()

if(BRPC_WITH_GLOG)
message(NOTICE "counter BRPC_WITH_GLOG=ON")
find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h)
find_library(GLOG_LIB NAMES glog VERSION ">=0.6.0" REQUIRE)
if((NOT GLOG_INCLUDE_PATH) OR (NOT GLOG_LIB))
message(FATAL_ERROR "Fail to find glog")
endif()
include_directories(${GLOG_INCLUDE_PATH})
endif()

protobuf_generate_cpp(PROTO_SRC PROTO_HEADER counter.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})
Expand Down Expand Up @@ -112,6 +123,7 @@ add_executable(counter_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})
set(DYNAMIC_LIB
${CMAKE_THREAD_LIBS_INIT}
${GFLAGS_LIBRARY}
${GLOG_LIB}
${PROTOBUF_LIBRARY}
${GPERFTOOLS_LIBRARIES}
${LEVELDB_LIB}
Expand Down
7 changes: 3 additions & 4 deletions example/counter/counter_hostname_test/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ static void* sender(void* arg) {
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader.hostname_);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
HostNameAddr2NSUrl(leader.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), braft::LOAD_BALANCER_NAME, NULL) != 0) {
LOG(ERROR) << "Fail to init channel to " << leader;
bthread_usleep(FLAGS_timeout_ms * 1000L);
continue;
Expand All @@ -73,7 +72,7 @@ static void* sender(void* arg) {

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_timeout_ms);
// Randomly select which request we want send;
// Randomly select which request we want to send;
example::CounterResponse response;

if (butil::fast_rand_less_than(100) < (size_t)FLAGS_add_percentage) {
Expand Down
1 change: 1 addition & 0 deletions example/counter/counter_hostname_test/run_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ for ((i=0; i<$FLAGS_server_num; ++i)); do
cp ./counter_server runtime/$i
cd runtime/$i
${VALGRIND} ./counter_server \
-node_hostname="${IP}" \
-bthread_concurrency=${FLAGS_bthread_concurrency}\
-crash_on_fatal_log=${FLAGS_crash_on_fatal} \
-raft_max_segment_size=${FLAGS_max_segment_size} \
Expand Down
4 changes: 1 addition & 3 deletions example/counter/counter_hostname_test/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ class Counter : public braft::StateMachine {
node_options.disable_cli = FLAGS_disable_cli;
// braft::Node* node = new braft::Node(FLAGS_group, braft::PeerId(addr));
braft::PeerId node_host(addr);
node_host.hostname_.append(FLAGS_node_hostname);
node_host.hostname_.append(":");
node_host.hostname_.append(std::to_string(FLAGS_port));
node_host.hostname_addr = braft::HostNameAddr(FLAGS_node_hostname, FLAGS_port);
node_host.type_ = braft::PeerId::Type::HostName;
braft::Node* node = new braft::Node(FLAGS_group, node_host);
if (node->init(node_options) != 0) {
Expand Down
21 changes: 7 additions & 14 deletions src/braft/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ static butil::Status get_leader(const GroupId& group_id, const Configuration& co
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(iter->hostname_);
HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
iter->to_string().c_str());
Expand Down Expand Up @@ -90,8 +89,7 @@ butil::Status add_peer(const GroupId& group_id, const Configuration& conf,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader_id.hostname_);
HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
Expand Down Expand Up @@ -138,8 +136,7 @@ butil::Status remove_peer(const GroupId& group_id, const Configuration& conf,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader_id.hostname_);
HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
Expand Down Expand Up @@ -187,8 +184,7 @@ butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(peer_id.hostname_);
HostNameAddr2NSUrl(peer_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peer_id.to_string().c_str());
Expand Down Expand Up @@ -223,8 +219,7 @@ butil::Status snapshot(const GroupId& group_id, const PeerId& peer_id,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(peer_id.hostname_);
HostNameAddr2NSUrl(peer_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peer_id.to_string().c_str());
Expand Down Expand Up @@ -261,8 +256,7 @@ butil::Status change_peers(const GroupId& group_id, const Configuration& conf,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader_id.hostname_);
HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
Expand Down Expand Up @@ -317,8 +311,7 @@ butil::Status transfer_leader(const GroupId& group_id, const Configuration& conf
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader_id.hostname_);
HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
Expand Down
103 changes: 75 additions & 28 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,68 @@ enum Role {
WITNESS = 1,
};

struct HostNameAddr {
HostNameAddr() : hostname(""), port(0) {}
explicit HostNameAddr(const std::string& hostname_, uint32_t port_) : hostname(hostname_), port(port_) {
if (port_ > 65535) {
hostname.clear();
port = 0;
}
}

HostNameAddr(const HostNameAddr& rhs) = default;
HostNameAddr(HostNameAddr&& rhs) = default;
HostNameAddr& operator=(const HostNameAddr& addr) = default;
HostNameAddr& operator=(HostNameAddr&& addr) noexcept {
if(&addr == this) {
return *this;
}
hostname = std::move(addr.hostname);
port = addr.port;
return *this;
}

void reset() {
hostname.clear();
port = 0;
}

std::string to_string() const;

std::string hostname;
uint32_t port;
};

inline bool operator<(const HostNameAddr& addr1, const HostNameAddr& addr2) {
return (addr1.hostname != addr2.hostname) ? (addr1.hostname < addr2.hostname) : (addr1.port < addr2.port);
}

inline bool operator==(const HostNameAddr& addr1, const HostNameAddr& addr2) {
return addr1.hostname == addr2.hostname && addr1.port == addr2.port;
}

inline bool operator!=(const HostNameAddr& addr1, const HostNameAddr& addr2) {
return !(addr1 == addr2);
}

inline std::ostream& operator<<(std::ostream& os, const HostNameAddr& addr) {
return os << addr.hostname << ":" << addr.port;
}

inline std::string HostNameAddr::to_string() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}


// Represent a participant in a replicating group.
// Conf like: 172-17-0-1.default.pod.cluster.local:8002:0,172-17-0-2.default.pod.cluster.local:8002:0,172-17-0-3.default.pod.cluster.local:8002:0
struct PeerId {
butil::EndPoint addr; // ip+port.
int idx; // idx in same addr, default 0
Role role = REPLICA;
std::string hostname_; // hostname+port, e.g. www.foo.com:8765
HostNameAddr hostname_addr; // hostname+port.
enum class Type {
EndPoint = 0,
HostName
Expand All @@ -62,24 +117,17 @@ struct PeerId {
}
/*intended implicit*/PeerId(const std::string& str)
{ CHECK_EQ(0, parse(str)); }
PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role), hostname_(id.hostname_), type_(id.type_) {}
PeerId(PeerId&& id) : addr(std::move(id.addr)), idx(std::move(id.idx)), role(std::move(id.role)), hostname_(std::move(id.hostname_)),
type_(std::move(id.type_)) {}

PeerId& operator=(const PeerId& id) {
addr = id.addr;
idx = id.idx;
hostname_ = id.hostname_;
type_ = id.type_;
role = id.role;
PeerId(const PeerId& id) = default;
PeerId(PeerId&& id) = default;
PeerId& operator=(const PeerId& id) = default;

return *this;
}

PeerId& operator=(PeerId&& id) {
PeerId& operator=(PeerId&& id) noexcept {
if ( &id == this) {
return *this;
}
addr = std::move(id.addr);
idx = std::move(id.idx);
hostname_ = std::move(id.hostname_);
hostname_addr = std::move(id.hostname_addr);
type_ = std::move(id.type_);
role = std::move(id.role);

Expand All @@ -92,7 +140,7 @@ struct PeerId {
addr.port = 0;
}
else {
hostname_.clear();
hostname_addr.reset();
}
idx = 0;
role = REPLICA;
Expand All @@ -102,7 +150,7 @@ struct PeerId {
if (type_ == Type::EndPoint) {
return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0);
} else {
return (hostname_.empty() && idx == 0);
return (hostname_addr.hostname.empty() && hostname_addr.port == 0 && idx == 0);
}
}
bool is_witness() const {
Expand All @@ -124,9 +172,8 @@ struct PeerId {
}
if (0 != butil::str2ip(temp_str, &addr.ip)) {
type_ = Type::HostName;
hostname_.append(temp_str);
hostname_.append(":");
hostname_.append(std::to_string(port));
hostname_addr.hostname = temp_str;
hostname_addr.port = port;
} else {
type_ = Type::EndPoint;
addr.port = port;
Expand All @@ -139,7 +186,7 @@ struct PeerId {
if (type_ == Type::EndPoint) {
snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role));
} else {
snprintf(str, sizeof(str), "%s:%d:%d", hostname_.c_str(), idx, int(role));
snprintf(str, sizeof(str), "%s:%d:%d", hostname_addr.to_string().c_str(), idx, int(role));
}
return std::string(str);
}
Expand All @@ -150,13 +197,13 @@ inline bool operator<(const PeerId& id1, const PeerId& id2) {
if (id1.type_ != id2.type_) {
LOG(WARNING) << "PeerId id1 and PeerId id2 do not have same type(IP Addr or Hostname).";
if (id1.type_ == PeerId::Type::EndPoint) {
if (strcmp(butil::endpoint2str(id1.addr).c_str(), id2.hostname_.c_str()) < 0) {
if (strcmp(butil::endpoint2str(id1.addr).c_str(), id2.hostname_addr.to_string().c_str()) < 0) {
return true;
} else {
return false;
}
} else {
if (strcmp(id1.hostname_.c_str(), butil::endpoint2str(id2.addr).c_str()) < 0) {
if (strcmp(id1.hostname_addr.to_string().c_str(), butil::endpoint2str(id2.addr).c_str()) < 0) {
return true;
} else {
return false;
Expand All @@ -170,10 +217,10 @@ inline bool operator<(const PeerId& id1, const PeerId& id2) {
return id1.addr == id2.addr && id1.idx < id2.idx;
}
} else {
if (id1.hostname_ < id2.hostname_) {
if (id1.hostname_addr < id2.hostname_addr) {
return true;
} else {
return id1.hostname_ == id2.hostname_ && id1.idx < id2.idx;
return id1.hostname_addr == id2.hostname_addr && id1.idx < id2.idx;
}
}
}
Expand All @@ -186,7 +233,7 @@ inline bool operator==(const PeerId& id1, const PeerId& id2) {
if (id1.type_ == PeerId::Type::EndPoint) {
return (id1.addr == id2.addr && id1.idx == id2.idx);
} else {
return (id1.hostname_ == id2.hostname_ && id1.idx == id2.idx);
return (id1.hostname_addr == id2.hostname_addr && id1.idx == id2.idx);
}
}

Expand All @@ -198,7 +245,7 @@ inline std::ostream& operator << (std::ostream& os, const PeerId& id) {
if (id.type_ == PeerId::Type::EndPoint) {
return os << id.addr << ':' << id.idx << ':' << int(id.role);
} else {
return os << id.hostname_ << ':' << id.idx << ':' << int(id.role);
return os << id.hostname_addr << ':' << id.idx << ':' << int(id.role);
}
}

Expand Down
10 changes: 4 additions & 6 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1662,11 +1662,10 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(iter->hostname_);
HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " channel init failed, hostname " << iter->hostname_;
<< " channel init failed, hostname " << iter->hostname_addr;
continue;
}
}
Expand Down Expand Up @@ -1778,11 +1777,10 @@ void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(iter->hostname_);
HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " channel init failed, addr " << iter->hostname_;
<< " channel init failed, addr " << iter->hostname_addr;
continue;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/braft/raft_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ int FileBasedSingleMetaStorage::save() {
meta.set_term(_term);
// if _votedfor's hostname_ is empty, the raft meta file(format ":0") could not be parsed.
// make some tricky fix.
if (_votedfor.type_ == PeerId::Type::HostName && _votedfor.hostname_.empty()) {
_votedfor.hostname_.append("localhost:0");
if (_votedfor.type_ == PeerId::Type::HostName && _votedfor.hostname_addr.hostname.empty()) {
_votedfor.hostname_addr = HostNameAddr("localhost", 0);
}
meta.set_votedfor(_votedfor.to_string());

Expand Down
3 changes: 1 addition & 2 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(options.peer_id.hostname_);
HostNameAddr2NSUrl(options.peer_id.hostname_addr, naming_service_url);
if (r->_sending_channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &channel_opt) != 0) {
LOG(ERROR) << "Fail to init sending channel"
<< ", group " << options.group_id;
Expand Down
Loading

0 comments on commit 0db90ba

Please sign in to comment.