From 7d5a607ec68fc530ec23347f83aae6e1d6c36ddd Mon Sep 17 00:00:00 2001 From: xiaolin0310 Date: Wed, 1 Nov 2023 16:15:17 +0800 Subject: [PATCH] refactor braft to support hostname as PeerId's endpoint * define a struct named HostNameAddr, as an attribute of PeerId * wrap a function to build naming_service_url, which is used for brpc channel init * modify logic about channel init with naming service --- .gitignore | 1 + .../counter_hostname_test/CMakeLists.txt | 12 ++ .../counter/counter_hostname_test/client.cpp | 7 +- .../counter_hostname_test/run_server.sh | 1 + .../counter/counter_hostname_test/server.cpp | 4 +- src/braft/cli.cpp | 21 ++-- src/braft/configuration.h | 103 +++++++++++++----- src/braft/node.cpp | 10 +- src/braft/raft_meta.cpp | 4 +- src/braft/replicator.cpp | 3 +- src/braft/route_table.cpp | 5 +- src/braft/util.h | 11 +- test/test_node.cpp | 6 +- 13 files changed, 120 insertions(+), 68 deletions(-) diff --git a/.gitignore b/.gitignore index ab155bff..3b6d6878 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ CTestTestfile.cmake # Github aciton !.github +cmake-build-debug \ No newline at end of file diff --git a/example/counter/counter_hostname_test/CMakeLists.txt b/example/counter/counter_hostname_test/CMakeLists.txt index 42785c8b..4db01b45 100644 --- a/example/counter/counter_hostname_test/CMakeLists.txt +++ b/example/counter/counter_hostname_test/CMakeLists.txt @@ -1,6 +1,7 @@ cmake_minimum_required(VERSION 2.8.10) project(counter C CXX) +option(BRPC_WITH_GLOG "With glog" OFF) option(EXAMPLE_LINK_SO "Whether examples are linked dynamically" OFF) option(LINK_TCMALLOC "Link tcmalloc if possible" ON) @@ -19,6 +20,16 @@ if (NOT PROTOBUF_PROTOC_EXECUTABLE) set (PROTOBUF_PROTOC_EXECUTABLE "${PROTO_LIB_DIR}/../bin/protoc") endif() +if(BRPC_WITH_GLOG) + message(NOTICE "counter BRPC_WITH_GLOG=ON") + find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h) + find_library(GLOG_LIB NAMES glog VERSION ">=0.6.0" REQUIRE) + if((NOT GLOG_INCLUDE_PATH) OR (NOT GLOG_LIB)) + message(FATAL_ERROR "Fail to find glog") + endif() + include_directories(${GLOG_INCLUDE_PATH}) +endif() + protobuf_generate_cpp(PROTO_SRC PROTO_HEADER counter.proto) # include PROTO_HEADER include_directories(${CMAKE_CURRENT_BINARY_DIR}) @@ -112,6 +123,7 @@ add_executable(counter_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) set(DYNAMIC_LIB ${CMAKE_THREAD_LIBS_INIT} ${GFLAGS_LIBRARY} + ${GLOG_LIB} ${PROTOBUF_LIBRARY} ${GPERFTOOLS_LIBRARIES} ${LEVELDB_LIB} diff --git a/example/counter/counter_hostname_test/client.cpp b/example/counter/counter_hostname_test/client.cpp index f63bc871..4c49a33c 100644 --- a/example/counter/counter_hostname_test/client.cpp +++ b/example/counter/counter_hostname_test/client.cpp @@ -60,9 +60,8 @@ static void* sender(void* arg) { } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(leader.hostname_); - if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { + HostNameAddr2NSUrl(leader.hostname_addr, naming_service_url); + if (channel.Init(naming_service_url.c_str(), braft::LOAD_BALANCER_NAME, NULL) != 0) { LOG(ERROR) << "Fail to init channel to " << leader; bthread_usleep(FLAGS_timeout_ms * 1000L); continue; @@ -73,7 +72,7 @@ static void* sender(void* arg) { brpc::Controller cntl; cntl.set_timeout_ms(FLAGS_timeout_ms); - // Randomly select which request we want send; + // Randomly select which request we want to send; example::CounterResponse response; if (butil::fast_rand_less_than(100) < (size_t)FLAGS_add_percentage) { diff --git a/example/counter/counter_hostname_test/run_server.sh b/example/counter/counter_hostname_test/run_server.sh index 19903e2b..0ae796d1 100644 --- a/example/counter/counter_hostname_test/run_server.sh +++ b/example/counter/counter_hostname_test/run_server.sh @@ -60,6 +60,7 @@ for ((i=0; i<$FLAGS_server_num; ++i)); do cp ./counter_server runtime/$i cd runtime/$i ${VALGRIND} ./counter_server \ + -node_hostname="${IP}" \ -bthread_concurrency=${FLAGS_bthread_concurrency}\ -crash_on_fatal_log=${FLAGS_crash_on_fatal} \ -raft_max_segment_size=${FLAGS_max_segment_size} \ diff --git a/example/counter/counter_hostname_test/server.cpp b/example/counter/counter_hostname_test/server.cpp index f8a605ee..94cd9a7e 100644 --- a/example/counter/counter_hostname_test/server.cpp +++ b/example/counter/counter_hostname_test/server.cpp @@ -91,9 +91,7 @@ class Counter : public braft::StateMachine { node_options.disable_cli = FLAGS_disable_cli; // braft::Node* node = new braft::Node(FLAGS_group, braft::PeerId(addr)); braft::PeerId node_host(addr); - node_host.hostname_.append(FLAGS_node_hostname); - node_host.hostname_.append(":"); - node_host.hostname_.append(std::to_string(FLAGS_port)); + node_host.hostname_addr = braft::HostNameAddr(FLAGS_node_hostname, FLAGS_port); node_host.type_ = braft::PeerId::Type::HostName; braft::Node* node = new braft::Node(FLAGS_group, node_host); if (node->init(node_options) != 0) { diff --git a/src/braft/cli.cpp b/src/braft/cli.cpp index cf2777ae..7c144bd3 100644 --- a/src/braft/cli.cpp +++ b/src/braft/cli.cpp @@ -42,8 +42,7 @@ static butil::Status get_leader(const GroupId& group_id, const Configuration& co } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(iter->hostname_); + HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { return butil::Status(-1, "Fail to init channel to %s", iter->to_string().c_str()); @@ -90,8 +89,7 @@ butil::Status add_peer(const GroupId& group_id, const Configuration& conf, } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(leader_id.hostname_); + HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { return butil::Status(-1, "Fail to init channel to %s", leader_id.to_string().c_str()); @@ -138,8 +136,7 @@ butil::Status remove_peer(const GroupId& group_id, const Configuration& conf, } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(leader_id.hostname_); + HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { return butil::Status(-1, "Fail to init channel to %s", leader_id.to_string().c_str()); @@ -187,8 +184,7 @@ butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id, } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(peer_id.hostname_); + HostNameAddr2NSUrl(peer_id.hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { return butil::Status(-1, "Fail to init channel to %s", peer_id.to_string().c_str()); @@ -223,8 +219,7 @@ butil::Status snapshot(const GroupId& group_id, const PeerId& peer_id, } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(peer_id.hostname_); + HostNameAddr2NSUrl(peer_id.hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { return butil::Status(-1, "Fail to init channel to %s", peer_id.to_string().c_str()); @@ -261,8 +256,7 @@ butil::Status change_peers(const GroupId& group_id, const Configuration& conf, } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(leader_id.hostname_); + HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { return butil::Status(-1, "Fail to init channel to %s", leader_id.to_string().c_str()); @@ -317,8 +311,7 @@ butil::Status transfer_leader(const GroupId& group_id, const Configuration& conf } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(leader_id.hostname_); + HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) { return butil::Status(-1, "Fail to init channel to %s", leader_id.to_string().c_str()); diff --git a/src/braft/configuration.h b/src/braft/configuration.h index 427b125a..0e514c74 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -39,13 +39,68 @@ enum Role { WITNESS = 1, }; +struct HostNameAddr { + HostNameAddr() : hostname(""), port(0) {} + explicit HostNameAddr(const std::string& hostname_, uint16_t port_) : hostname(hostname_), port(port_) { + if (port_ > 65535) { + hostname.clear(); + port = 0; + } + } + + HostNameAddr(const HostNameAddr& rhs) = default; + HostNameAddr(HostNameAddr&& rhs) = default; + HostNameAddr& operator=(const HostNameAddr& addr) = default; + HostNameAddr& operator=(HostNameAddr&& addr) noexcept { + if(&addr == this) { + return *this; + } + hostname = std::move(addr.hostname); + port = addr.port; + return *this; + } + + void reset() { + hostname.clear(); + port = 0; + } + + std::string to_string() const; + + std::string hostname; + uint16_t port; +}; + +inline bool operator<(const HostNameAddr& addr1, const HostNameAddr& addr2) { + return (addr1.hostname != addr2.hostname) ? (addr1.hostname < addr2.hostname) : (addr1.port < addr2.port); +} + +inline bool operator==(const HostNameAddr& addr1, const HostNameAddr& addr2) { + return addr1.hostname == addr2.hostname && addr1.port == addr2.port; +} + +inline bool operator!=(const HostNameAddr& addr1, const HostNameAddr& addr2) { + return !(addr1 == addr2); +} + +inline std::ostream& operator<<(std::ostream& os, const HostNameAddr& addr) { + return os << addr.hostname << ":" << addr.port; +} + +inline std::string HostNameAddr::to_string() const { + std::ostringstream oss; + oss << *this; + return oss.str(); +} + + // Represent a participant in a replicating group. // Conf like: 172-17-0-1.default.pod.cluster.local:8002:0,172-17-0-2.default.pod.cluster.local:8002:0,172-17-0-3.default.pod.cluster.local:8002:0 struct PeerId { butil::EndPoint addr; // ip+port. int idx; // idx in same addr, default 0 Role role = REPLICA; - std::string hostname_; // hostname+port, e.g. www.foo.com:8765 + HostNameAddr hostname_addr; // hostname+port. enum class Type { EndPoint = 0, HostName @@ -62,24 +117,17 @@ struct PeerId { } /*intended implicit*/PeerId(const std::string& str) { CHECK_EQ(0, parse(str)); } - PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role), hostname_(id.hostname_), type_(id.type_) {} - PeerId(PeerId&& id) : addr(std::move(id.addr)), idx(std::move(id.idx)), role(std::move(id.role)), hostname_(std::move(id.hostname_)), - type_(std::move(id.type_)) {} - - PeerId& operator=(const PeerId& id) { - addr = id.addr; - idx = id.idx; - hostname_ = id.hostname_; - type_ = id.type_; - role = id.role; + PeerId(const PeerId& id) = default; + PeerId(PeerId&& id) = default; + PeerId& operator=(const PeerId& id) = default; - return *this; - } - - PeerId& operator=(PeerId&& id) { + PeerId& operator=(PeerId&& id) noexcept { + if ( &id == this) { + return *this; + } addr = std::move(id.addr); idx = std::move(id.idx); - hostname_ = std::move(id.hostname_); + hostname_addr = std::move(id.hostname_addr); type_ = std::move(id.type_); role = std::move(id.role); @@ -92,7 +140,7 @@ struct PeerId { addr.port = 0; } else { - hostname_.clear(); + hostname_addr.reset(); } idx = 0; role = REPLICA; @@ -102,7 +150,7 @@ struct PeerId { if (type_ == Type::EndPoint) { return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0); } else { - return (hostname_.empty() && idx == 0); + return (hostname_addr.hostname.empty() && hostname_addr.port == 0 && idx == 0); } } bool is_witness() const { @@ -124,9 +172,8 @@ struct PeerId { } if (0 != butil::str2ip(temp_str, &addr.ip)) { type_ = Type::HostName; - hostname_.append(temp_str); - hostname_.append(":"); - hostname_.append(std::to_string(port)); + hostname_addr.hostname = temp_str; + hostname_addr.port = port; } else { type_ = Type::EndPoint; addr.port = port; @@ -139,7 +186,7 @@ struct PeerId { if (type_ == Type::EndPoint) { snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role)); } else { - snprintf(str, sizeof(str), "%s:%d:%d", hostname_.c_str(), idx, int(role)); + snprintf(str, sizeof(str), "%s:%d:%d", hostname_addr.to_string().c_str(), idx, int(role)); } return std::string(str); } @@ -150,13 +197,13 @@ inline bool operator<(const PeerId& id1, const PeerId& id2) { if (id1.type_ != id2.type_) { LOG(WARNING) << "PeerId id1 and PeerId id2 do not have same type(IP Addr or Hostname)."; if (id1.type_ == PeerId::Type::EndPoint) { - if (strcmp(butil::endpoint2str(id1.addr).c_str(), id2.hostname_.c_str()) < 0) { + if (strcmp(butil::endpoint2str(id1.addr).c_str(), id2.hostname_addr.to_string().c_str()) < 0) { return true; } else { return false; } } else { - if (strcmp(id1.hostname_.c_str(), butil::endpoint2str(id2.addr).c_str()) < 0) { + if (strcmp(id1.hostname_addr.to_string().c_str(), butil::endpoint2str(id2.addr).c_str()) < 0) { return true; } else { return false; @@ -170,10 +217,10 @@ inline bool operator<(const PeerId& id1, const PeerId& id2) { return id1.addr == id2.addr && id1.idx < id2.idx; } } else { - if (id1.hostname_ < id2.hostname_) { + if (id1.hostname_addr < id2.hostname_addr) { return true; } else { - return id1.hostname_ == id2.hostname_ && id1.idx < id2.idx; + return id1.hostname_addr == id2.hostname_addr && id1.idx < id2.idx; } } } @@ -186,7 +233,7 @@ inline bool operator==(const PeerId& id1, const PeerId& id2) { if (id1.type_ == PeerId::Type::EndPoint) { return (id1.addr == id2.addr && id1.idx == id2.idx); } else { - return (id1.hostname_ == id2.hostname_ && id1.idx == id2.idx); + return (id1.hostname_addr == id2.hostname_addr && id1.idx == id2.idx); } } @@ -198,7 +245,7 @@ inline std::ostream& operator << (std::ostream& os, const PeerId& id) { if (id.type_ == PeerId::Type::EndPoint) { return os << id.addr << ':' << id.idx << ':' << int(id.role); } else { - return os << id.hostname_ << ':' << id.idx << ':' << int(id.role); + return os << id.hostname_addr << ':' << id.idx << ':' << int(id.role); } } diff --git a/src/braft/node.cpp b/src/braft/node.cpp index 687e46a4..0b466790 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -1662,11 +1662,10 @@ void NodeImpl::pre_vote(std::unique_lock* lck, bool triggered) { } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(iter->hostname_); + HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) { LOG(WARNING) << "node " << _group_id << ":" << _server_id - << " channel init failed, hostname " << iter->hostname_; + << " channel init failed, hostname " << iter->hostname_addr; continue; } } @@ -1778,11 +1777,10 @@ void NodeImpl::request_peers_to_vote(const std::set& peers, } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(iter->hostname_); + HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) { LOG(WARNING) << "node " << _group_id << ":" << _server_id - << " channel init failed, addr " << iter->hostname_; + << " channel init failed, addr " << iter->hostname_addr; continue; } } diff --git a/src/braft/raft_meta.cpp b/src/braft/raft_meta.cpp index 065888c7..0cc8938d 100644 --- a/src/braft/raft_meta.cpp +++ b/src/braft/raft_meta.cpp @@ -533,8 +533,8 @@ int FileBasedSingleMetaStorage::save() { meta.set_term(_term); // if _votedfor's hostname_ is empty, the raft meta file(format ":0") could not be parsed. // make some tricky fix. - if (_votedfor.type_ == PeerId::Type::HostName && _votedfor.hostname_.empty()) { - _votedfor.hostname_.append("localhost:0"); + if (_votedfor.type_ == PeerId::Type::HostName && _votedfor.hostname_addr.hostname.empty()) { + _votedfor.hostname_addr = HostNameAddr("localhost", 0); } meta.set_votedfor(_votedfor.to_string()); diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 6e78493e..29d81d2c 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -125,8 +125,7 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) { } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(options.peer_id.hostname_); + HostNameAddr2NSUrl(options.peer_id.hostname_addr, naming_service_url); if (r->_sending_channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &channel_opt) != 0) { LOG(ERROR) << "Fail to init sending channel" << ", group " << options.group_id; diff --git a/src/braft/route_table.cpp b/src/braft/route_table.cpp index 9f79c66a..b7c78c27 100644 --- a/src/braft/route_table.cpp +++ b/src/braft/route_table.cpp @@ -214,12 +214,11 @@ butil::Status refresh_leader(const GroupId& group, int timeout_ms) { } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(iter->hostname_); + HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url); auto [success, chan] = rtb->InitAndGetChannelTo(naming_service_url); if (!success) { error.set_error(-1, "Fail to init HostName channel to %s", - iter->hostname_.c_str()); + iter->hostname_addr.to_string().c_str()); continue; } chan_ptr = chan; diff --git a/src/braft/util.h b/src/braft/util.h index 902914f1..17ff0891 100644 --- a/src/braft/util.h +++ b/src/braft/util.h @@ -43,6 +43,7 @@ #include #include "braft/macros.h" #include "braft/raft.h" +#include "braft/configuration.h" namespace bvar { namespace detail { @@ -172,9 +173,15 @@ std::ostream& operator<<(std::ostream& os, const CounterRecorder&); namespace braft { class Closure; -// for Brpc Channel Init Naming Service API -inline const char *PROTOCOL_PREFIX = "http://"; +// for brpc channel init naming service api inline const char *LOAD_BALANCER_NAME = "rr"; +inline const char *HTTP_SCHEMA = "http://"; + +// naming_service_url format: http://hostname:port +inline void HostNameAddr2NSUrl(const HostNameAddr &hostname_addr, std::string &ns_url) { + ns_url.append(HTTP_SCHEMA); + ns_url.append(hostname_addr.to_string()); +} // http://stackoverflow.com/questions/1493936/faster-approach-to-checking-for-an-all-zero-buffer-in-c inline bool is_zero(const char* buff, const size_t size) { diff --git a/test/test_node.cpp b/test/test_node.cpp index c90454d6..fb21f6b6 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -272,8 +272,7 @@ TEST_P(NodeTest, TripleNode) { } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(leader->node_id().peer_id.hostname_); + HostNameAddr2NSUrl(leader->node_id().peer_id.hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) { LOG(ERROR) << "Fail to initialize channel"; } @@ -717,8 +716,7 @@ TEST_P(NodeTest, Leader_step_down_during_install_snapshot) { } } else { std::string naming_service_url; - naming_service_url.append(PROTOCOL_PREFIX); - naming_service_url.append(leader->node_id().peer_id.hostname_); + HostNameAddr2NSUrl(leader->node_id().peer_id.hostname_addr, naming_service_url); if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) { LOG(ERROR) << "Fail to initialize channel"; }