Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor braft to support hostname as PeerId's endpoint #4

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ CTestTestfile.cmake

# Github aciton
!.github
cmake-build-debug
12 changes: 12 additions & 0 deletions example/counter/counter_hostname_test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cmake_minimum_required(VERSION 2.8.10)
project(counter C CXX)

option(BRPC_WITH_GLOG "With glog" OFF)
option(EXAMPLE_LINK_SO "Whether examples are linked dynamically" OFF)
option(LINK_TCMALLOC "Link tcmalloc if possible" ON)

Expand All @@ -19,6 +20,16 @@ if (NOT PROTOBUF_PROTOC_EXECUTABLE)
set (PROTOBUF_PROTOC_EXECUTABLE "${PROTO_LIB_DIR}/../bin/protoc")
endif()

if(BRPC_WITH_GLOG)
message(NOTICE "counter BRPC_WITH_GLOG=ON")
find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h)
find_library(GLOG_LIB NAMES glog VERSION ">=0.6.0" REQUIRE)
if((NOT GLOG_INCLUDE_PATH) OR (NOT GLOG_LIB))
message(FATAL_ERROR "Fail to find glog")
endif()
include_directories(${GLOG_INCLUDE_PATH})
endif()

protobuf_generate_cpp(PROTO_SRC PROTO_HEADER counter.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})
Expand Down Expand Up @@ -112,6 +123,7 @@ add_executable(counter_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})
set(DYNAMIC_LIB
${CMAKE_THREAD_LIBS_INIT}
${GFLAGS_LIBRARY}
${GLOG_LIB}
${PROTOBUF_LIBRARY}
${GPERFTOOLS_LIBRARIES}
${LEVELDB_LIB}
Expand Down
7 changes: 3 additions & 4 deletions example/counter/counter_hostname_test/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ static void* sender(void* arg) {
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader.hostname_);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
HostNameAddr2NSUrl(leader.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), braft::LOAD_BALANCER_NAME, NULL) != 0) {
LOG(ERROR) << "Fail to init channel to " << leader;
bthread_usleep(FLAGS_timeout_ms * 1000L);
continue;
Expand All @@ -73,7 +72,7 @@ static void* sender(void* arg) {

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_timeout_ms);
// Randomly select which request we want send;
// Randomly select which request we want to send;
example::CounterResponse response;

if (butil::fast_rand_less_than(100) < (size_t)FLAGS_add_percentage) {
Expand Down
1 change: 1 addition & 0 deletions example/counter/counter_hostname_test/run_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ for ((i=0; i<$FLAGS_server_num; ++i)); do
cp ./counter_server runtime/$i
cd runtime/$i
${VALGRIND} ./counter_server \
-node_hostname="${IP}" \
-bthread_concurrency=${FLAGS_bthread_concurrency}\
-crash_on_fatal_log=${FLAGS_crash_on_fatal} \
-raft_max_segment_size=${FLAGS_max_segment_size} \
Expand Down
4 changes: 1 addition & 3 deletions example/counter/counter_hostname_test/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ class Counter : public braft::StateMachine {
node_options.disable_cli = FLAGS_disable_cli;
// braft::Node* node = new braft::Node(FLAGS_group, braft::PeerId(addr));
braft::PeerId node_host(addr);
node_host.hostname_.append(FLAGS_node_hostname);
node_host.hostname_.append(":");
node_host.hostname_.append(std::to_string(FLAGS_port));
node_host.hostname_addr = braft::HostNameAddr(FLAGS_node_hostname, FLAGS_port);
node_host.type_ = braft::PeerId::Type::HostName;
braft::Node* node = new braft::Node(FLAGS_group, node_host);
if (node->init(node_options) != 0) {
Expand Down
21 changes: 7 additions & 14 deletions src/braft/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ static butil::Status get_leader(const GroupId& group_id, const Configuration& co
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(iter->hostname_);
HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
iter->to_string().c_str());
Expand Down Expand Up @@ -90,8 +89,7 @@ butil::Status add_peer(const GroupId& group_id, const Configuration& conf,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader_id.hostname_);
HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
Expand Down Expand Up @@ -138,8 +136,7 @@ butil::Status remove_peer(const GroupId& group_id, const Configuration& conf,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader_id.hostname_);
HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
Expand Down Expand Up @@ -187,8 +184,7 @@ butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(peer_id.hostname_);
HostNameAddr2NSUrl(peer_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peer_id.to_string().c_str());
Expand Down Expand Up @@ -223,8 +219,7 @@ butil::Status snapshot(const GroupId& group_id, const PeerId& peer_id,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(peer_id.hostname_);
HostNameAddr2NSUrl(peer_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peer_id.to_string().c_str());
Expand Down Expand Up @@ -261,8 +256,7 @@ butil::Status change_peers(const GroupId& group_id, const Configuration& conf,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader_id.hostname_);
HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
Expand Down Expand Up @@ -317,8 +311,7 @@ butil::Status transfer_leader(const GroupId& group_id, const Configuration& conf
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader_id.hostname_);
HostNameAddr2NSUrl(leader_id.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
leader_id.to_string().c_str());
Expand Down
103 changes: 75 additions & 28 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,68 @@ enum Role {
WITNESS = 1,
};

struct HostNameAddr {
HostNameAddr() : hostname(""), port(0) {}
explicit HostNameAddr(const std::string& hostname_, uint16_t port_) : hostname(hostname_), port(port_) {
if (port_ > 65535) {
hostname.clear();
port = 0;
}
}

HostNameAddr(const HostNameAddr& rhs) = default;
HostNameAddr(HostNameAddr&& rhs) = default;
HostNameAddr& operator=(const HostNameAddr& addr) = default;
HostNameAddr& operator=(HostNameAddr&& addr) noexcept {
if(&addr == this) {
return *this;
}
hostname = std::move(addr.hostname);
port = addr.port;
return *this;
}

void reset() {
hostname.clear();
port = 0;
}

std::string to_string() const;

std::string hostname;
uint16_t port;
};

inline bool operator<(const HostNameAddr& addr1, const HostNameAddr& addr2) {
return (addr1.hostname != addr2.hostname) ? (addr1.hostname < addr2.hostname) : (addr1.port < addr2.port);
}

inline bool operator==(const HostNameAddr& addr1, const HostNameAddr& addr2) {
return addr1.hostname == addr2.hostname && addr1.port == addr2.port;
}

inline bool operator!=(const HostNameAddr& addr1, const HostNameAddr& addr2) {
return !(addr1 == addr2);
}

inline std::ostream& operator<<(std::ostream& os, const HostNameAddr& addr) {
return os << addr.hostname << ":" << addr.port;
}

inline std::string HostNameAddr::to_string() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}


// Represent a participant in a replicating group.
// Conf like: 172-17-0-1.default.pod.cluster.local:8002:0,172-17-0-2.default.pod.cluster.local:8002:0,172-17-0-3.default.pod.cluster.local:8002:0
struct PeerId {
butil::EndPoint addr; // ip+port.
int idx; // idx in same addr, default 0
Role role = REPLICA;
std::string hostname_; // hostname+port, e.g. www.foo.com:8765
HostNameAddr hostname_addr; // hostname+port.
enum class Type {
EndPoint = 0,
HostName
Expand All @@ -62,24 +117,17 @@ struct PeerId {
}
/*intended implicit*/PeerId(const std::string& str)
{ CHECK_EQ(0, parse(str)); }
PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role), hostname_(id.hostname_), type_(id.type_) {}
PeerId(PeerId&& id) : addr(std::move(id.addr)), idx(std::move(id.idx)), role(std::move(id.role)), hostname_(std::move(id.hostname_)),
type_(std::move(id.type_)) {}

PeerId& operator=(const PeerId& id) {
addr = id.addr;
idx = id.idx;
hostname_ = id.hostname_;
type_ = id.type_;
role = id.role;
PeerId(const PeerId& id) = default;
PeerId(PeerId&& id) = default;
PeerId& operator=(const PeerId& id) = default;

return *this;
}

PeerId& operator=(PeerId&& id) {
PeerId& operator=(PeerId&& id) noexcept {
if ( &id == this) {
return *this;
}
addr = std::move(id.addr);
idx = std::move(id.idx);
hostname_ = std::move(id.hostname_);
hostname_addr = std::move(id.hostname_addr);
type_ = std::move(id.type_);
role = std::move(id.role);

Expand All @@ -92,7 +140,7 @@ struct PeerId {
addr.port = 0;
}
else {
hostname_.clear();
hostname_addr.reset();
}
idx = 0;
role = REPLICA;
Expand All @@ -102,7 +150,7 @@ struct PeerId {
if (type_ == Type::EndPoint) {
return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0);
} else {
return (hostname_.empty() && idx == 0);
return (hostname_addr.hostname.empty() && hostname_addr.port == 0 && idx == 0);
}
}
bool is_witness() const {
Expand All @@ -124,9 +172,8 @@ struct PeerId {
}
if (0 != butil::str2ip(temp_str, &addr.ip)) {
type_ = Type::HostName;
hostname_.append(temp_str);
hostname_.append(":");
hostname_.append(std::to_string(port));
hostname_addr.hostname = temp_str;
hostname_addr.port = port;
} else {
type_ = Type::EndPoint;
addr.port = port;
Expand All @@ -139,7 +186,7 @@ struct PeerId {
if (type_ == Type::EndPoint) {
snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role));
} else {
snprintf(str, sizeof(str), "%s:%d:%d", hostname_.c_str(), idx, int(role));
snprintf(str, sizeof(str), "%s:%d:%d", hostname_addr.to_string().c_str(), idx, int(role));
}
return std::string(str);
}
Expand All @@ -150,13 +197,13 @@ inline bool operator<(const PeerId& id1, const PeerId& id2) {
if (id1.type_ != id2.type_) {
LOG(WARNING) << "PeerId id1 and PeerId id2 do not have same type(IP Addr or Hostname).";
if (id1.type_ == PeerId::Type::EndPoint) {
if (strcmp(butil::endpoint2str(id1.addr).c_str(), id2.hostname_.c_str()) < 0) {
if (strcmp(butil::endpoint2str(id1.addr).c_str(), id2.hostname_addr.to_string().c_str()) < 0) {
return true;
} else {
return false;
}
} else {
if (strcmp(id1.hostname_.c_str(), butil::endpoint2str(id2.addr).c_str()) < 0) {
if (strcmp(id1.hostname_addr.to_string().c_str(), butil::endpoint2str(id2.addr).c_str()) < 0) {
return true;
} else {
return false;
Expand All @@ -170,10 +217,10 @@ inline bool operator<(const PeerId& id1, const PeerId& id2) {
return id1.addr == id2.addr && id1.idx < id2.idx;
}
} else {
if (id1.hostname_ < id2.hostname_) {
if (id1.hostname_addr < id2.hostname_addr) {
return true;
} else {
return id1.hostname_ == id2.hostname_ && id1.idx < id2.idx;
return id1.hostname_addr == id2.hostname_addr && id1.idx < id2.idx;
}
}
}
Expand All @@ -186,7 +233,7 @@ inline bool operator==(const PeerId& id1, const PeerId& id2) {
if (id1.type_ == PeerId::Type::EndPoint) {
return (id1.addr == id2.addr && id1.idx == id2.idx);
} else {
return (id1.hostname_ == id2.hostname_ && id1.idx == id2.idx);
return (id1.hostname_addr == id2.hostname_addr && id1.idx == id2.idx);
}
}

Expand All @@ -198,7 +245,7 @@ inline std::ostream& operator << (std::ostream& os, const PeerId& id) {
if (id.type_ == PeerId::Type::EndPoint) {
return os << id.addr << ':' << id.idx << ':' << int(id.role);
} else {
return os << id.hostname_ << ':' << id.idx << ':' << int(id.role);
return os << id.hostname_addr << ':' << id.idx << ':' << int(id.role);
}
}

Expand Down
10 changes: 4 additions & 6 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1662,11 +1662,10 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(iter->hostname_);
HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " channel init failed, hostname " << iter->hostname_;
<< " channel init failed, hostname " << iter->hostname_addr;
continue;
}
}
Expand Down Expand Up @@ -1778,11 +1777,10 @@ void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(iter->hostname_);
HostNameAddr2NSUrl(iter->hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &options) != 0) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " channel init failed, addr " << iter->hostname_;
<< " channel init failed, addr " << iter->hostname_addr;
continue;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/braft/raft_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ int FileBasedSingleMetaStorage::save() {
meta.set_term(_term);
// if _votedfor's hostname_ is empty, the raft meta file(format ":0") could not be parsed.
// make some tricky fix.
if (_votedfor.type_ == PeerId::Type::HostName && _votedfor.hostname_.empty()) {
_votedfor.hostname_.append("localhost:0");
if (_votedfor.type_ == PeerId::Type::HostName && _votedfor.hostname_addr.hostname.empty()) {
_votedfor.hostname_addr = HostNameAddr("localhost", 0);
}
meta.set_votedfor(_votedfor.to_string());

Expand Down
3 changes: 1 addition & 2 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(options.peer_id.hostname_);
HostNameAddr2NSUrl(options.peer_id.hostname_addr, naming_service_url);
if (r->_sending_channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, &channel_opt) != 0) {
LOG(ERROR) << "Fail to init sending channel"
<< ", group " << options.group_id;
Expand Down
Loading
Loading