Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make braft PeerId support hostname/dns. #419

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ CTestTestfile.cmake

# Github aciton
!.github
cmake-build-debug
146 changes: 146 additions & 0 deletions example/counter/counter_hostname_test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
cmake_minimum_required(VERSION 2.8.10)
project(counter C CXX)

option(BRPC_WITH_GLOG "With glog" OFF)
option(EXAMPLE_LINK_SO "Whether examples are linked dynamically" OFF)
option(LINK_TCMALLOC "Link tcmalloc if possible" ON)

execute_process(
COMMAND bash -c "find ${CMAKE_SOURCE_DIR}/../.. -type d -path \"*output/include/braft\" | xargs dirname | xargs dirname | tr -d '\n'"
OUTPUT_VARIABLE OUTPUT_PATH
)

set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})

include(FindThreads)
include(FindProtobuf)

if (NOT PROTOBUF_PROTOC_EXECUTABLE)
get_filename_component(PROTO_LIB_DIR ${PROTOBUF_LIBRARY} DIRECTORY)
set (PROTOBUF_PROTOC_EXECUTABLE "${PROTO_LIB_DIR}/../bin/protoc")
endif()

if(BRPC_WITH_GLOG)
message(NOTICE "counter BRPC_WITH_GLOG=ON")
find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h)
find_library(GLOG_LIB NAMES glog VERSION ">=0.6.0" REQUIRE)
if((NOT GLOG_INCLUDE_PATH) OR (NOT GLOG_LIB))
message(FATAL_ERROR "Fail to find glog")
endif()
include_directories(${GLOG_INCLUDE_PATH})
endif()

protobuf_generate_cpp(PROTO_SRC PROTO_HEADER counter.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})

find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
if(EXAMPLE_LINK_SO)
find_library(BRPC_LIB NAMES brpc)
find_library(BRAFT_LIB NAMES braft)
else()
find_library(BRPC_LIB NAMES libbrpc.a brpc)
find_library(BRAFT_LIB NAMES libbraft.a braft)
endif()

if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
message(FATAL_ERROR "Fail to find brpc")
endif()
include_directories(${BRPC_INCLUDE_PATH})

find_path(BRAFT_INCLUDE_PATH NAMES braft/raft.h)
if ((NOT BRAFT_INCLUDE_PATH) OR (NOT BRAFT_LIB))
message (FATAL_ERROR "Fail to find braft")
endif()
include_directories(${BRAFT_INCLUDE_PATH})

find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
message(FATAL_ERROR "Fail to find gflags")
endif()
include_directories(${GFLAGS_INCLUDE_PATH})

execute_process(
COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'"
OUTPUT_VARIABLE GFLAGS_NS
)
if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
execute_process(
COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'"
OUTPUT_VARIABLE GFLAGS_NS
)
endif()

if (LINK_TCMALLOC)
find_path(GPERFTOOLS_INCLUDE_DIR NAMES gperftools/heap-profiler.h)
find_library(GPERFTOOLS_LIBRARIES NAMES tcmalloc_and_profiler)
if (GPERFTOOLS_INCLUDE_DIR AND GPERFTOOLS_LIBRARIES)
set(CMAKE_CXX_FLAGS "-DBRPC_ENABLE_CPU_PROFILER")
include_directories(${GPERFTOOLS_INCLUDE_DIR})
else ()
set (GPERFTOOLS_LIBRARIES "")
endif ()
endif ()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CPP_FLAGS} -DGFLAGS_NS=${GFLAGS_NS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
# require at least gcc 4.8
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.8)
message(FATAL_ERROR "GCC is too old, please install a newer version supporting C++11")
endif()
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
# require at least clang 3.3
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 3.3)
message(FATAL_ERROR "Clang is too old, please install a newer version supporting C++11")
endif()
else()
message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.")
endif()

if(CMAKE_VERSION VERSION_LESS "3.1.3")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()

find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
find_library(LEVELDB_LIB NAMES leveldb)
if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
message(FATAL_ERROR "Fail to find leveldb")
endif()
include_directories(${LEVELDB_INCLUDE_PATH})

add_executable(counter_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
add_executable(counter_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})

set(DYNAMIC_LIB
${CMAKE_THREAD_LIBS_INIT}
${GFLAGS_LIBRARY}
${GLOG_LIB}
${PROTOBUF_LIBRARY}
${GPERFTOOLS_LIBRARIES}
${LEVELDB_LIB}
${BRAFT_LIB}
${BRPC_LIB}
rt
ssl
crypto
dl
z
)

target_link_libraries(counter_client
"-Xlinker \"-(\""
${DYNAMIC_LIB}
"-Xlinker \"-)\"")
target_link_libraries(counter_server
"-Xlinker \"-(\""
${DYNAMIC_LIB}
"-Xlinker \"-)\"")
162 changes: 162 additions & 0 deletions example/counter/counter_hostname_test/client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <brpc/channel.h>
#include <brpc/controller.h>
#include <braft/raft.h>
#include <braft/util.h>
#include <braft/route_table.h>
#include "counter.pb.h"

DEFINE_bool(log_each_request, false, "Print log for each request");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_int32(add_percentage, 100, "Percentage of fetch_add");
DEFINE_int64(added_by, 1, "Num added to each peer");
DEFINE_int32(thread_num, 1, "Number of threads sending requests");
DEFINE_int32(timeout_ms, 1000, "Timeout for each request");
DEFINE_string(conf, "", "Configuration of the raft group");
DEFINE_string(group, "Counter", "Id of the replication group");

bvar::LatencyRecorder g_latency_recorder("counter_client");

static void* sender(void* arg) {
while (!brpc::IsAskedToQuit()) {
braft::PeerId leader;
// Select leader of the target group from RouteTable
if (braft::rtb::select_leader(FLAGS_group, &leader) != 0) {
// Leader is unknown in RouteTable. Ask RouteTable to refresh leader
// by sending RPCs.
butil::Status st = braft::rtb::refresh_leader(
FLAGS_group, FLAGS_timeout_ms);
if (!st.ok()) {
// Not sure about the leader, sleep for a while and the ask again.
LOG(WARNING) << "Fail to refresh_leader : " << st;
bthread_usleep(FLAGS_timeout_ms * 1000L);
}
continue;
}

// Now we known who is the leader, construct Stub and then sending
// rpc
brpc::Channel channel;
if (leader.type_ == braft::PeerId::Type::EndPoint) {
if (channel.Init(leader.addr, NULL) != 0) {
LOG(ERROR) << "Fail to init channel to " << leader;
bthread_usleep(FLAGS_timeout_ms * 1000L);
continue;
}
} else {
std::string naming_service_url;
HostNameAddr2NSUrl(leader.hostname_addr, naming_service_url);
if (channel.Init(naming_service_url.c_str(), braft::LOAD_BALANCER_NAME, NULL) != 0) {
LOG(ERROR) << "Fail to init channel to " << leader;
bthread_usleep(FLAGS_timeout_ms * 1000L);
continue;
}

}
example::CounterService_Stub stub(&channel);

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_timeout_ms);
// Randomly select which request we want to send;
example::CounterResponse response;

if (butil::fast_rand_less_than(100) < (size_t)FLAGS_add_percentage) {
example::FetchAddRequest request;
request.set_value(FLAGS_added_by);
stub.fetch_add(&cntl, &request, &response, NULL);
} else {
example::GetRequest request;
stub.get(&cntl, &request, &response, NULL);
}
if (cntl.Failed()) {
LOG(WARNING) << "Fail to send request to " << leader
<< " : " << cntl.ErrorText();
// Clear leadership since this RPC failed.
braft::rtb::update_leader(FLAGS_group, braft::PeerId());
bthread_usleep(FLAGS_timeout_ms * 1000L);
continue;
}
if (!response.success()) {
LOG(WARNING) << "Fail to send request to " << leader
<< ", redirecting to "
<< (response.has_redirect()
? response.redirect() : "nowhere");
// Update route table since we have redirect information
braft::rtb::update_leader(FLAGS_group, response.redirect());
continue;
}
g_latency_recorder << cntl.latency_us();
if (FLAGS_log_each_request) {
LOG(INFO) << "Received response from " << leader
<< " value=" << response.value()
<< " latency=" << cntl.latency_us();
bthread_usleep(1000L * 1000L);
}
}
return NULL;
}

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
butil::AtExitManager exit_manager;

// Register configuration of target group to RouteTable
if (braft::rtb::update_configuration(FLAGS_group, FLAGS_conf) != 0) {
LOG(ERROR) << "Fail to register configuration " << FLAGS_conf
<< " of group " << FLAGS_group;
return -1;
}

std::vector<bthread_t> tids;
tids.resize(FLAGS_thread_num);
if (!FLAGS_use_bthread) {
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (pthread_create(&tids[i], NULL, sender, NULL) != 0) {
LOG(ERROR) << "Fail to create pthread";
return -1;
}
}
} else {
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (bthread_start_background(&tids[i], NULL, sender, NULL) != 0) {
LOG(ERROR) << "Fail to create bthread";
return -1;
}
}
}

while (!brpc::IsAskedToQuit()) {
sleep(1);
LOG_IF(INFO, !FLAGS_log_each_request)
<< "Sending Request to " << FLAGS_group
<< " (" << FLAGS_conf << ')'
<< " at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}

LOG(INFO) << "Counter client is going to quit";
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (!FLAGS_use_bthread) {
pthread_join(tids[i], NULL);
} else {
bthread_join(tids[i], NULL);
}
}

return 0;
}
25 changes: 25 additions & 0 deletions example/counter/counter_hostname_test/counter.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax="proto2";
package example;
option cc_generic_services = true;

message Snapshot {
required int64 value = 1;
};

message FetchAddRequest {
required int64 value = 1;
};

message CounterResponse {
required bool success = 1;
optional int64 value = 2;
optional string redirect = 3;
};

message GetRequest {
};

service CounterService {
rpc fetch_add(FetchAddRequest) returns (CounterResponse);
rpc get(GetRequest) returns (CounterResponse);
};
Loading