diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 19bcdae6963627..e508bbe6985ddd 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -38,6 +38,7 @@ #include "olap/utils.h" #include "common/resource_tls.h" #include "agent/cgroups_mgr.h" +#include "service/backend_options.h" using std::deque; using std::list; @@ -69,7 +70,7 @@ TaskWorkerPool::TaskWorkerPool( _agent_utils = new AgentUtils(); _master_client = new MasterServerClient(_master_info, &_master_service_client_cache); _command_executor = new CommandExecutor(); - _backend.__set_host(_agent_utils->get_local_ip()); + _backend.__set_host(BackendOptions::get_localhost()); _backend.__set_be_port(config::be_port); _backend.__set_http_port(config::webserver_port); } diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp index ed423fdb8a0c37..6c9dc1904d4017 100644 --- a/be/src/agent/utils.cpp +++ b/be/src/agent/utils.cpp @@ -260,25 +260,6 @@ AgentStatus AgentUtils::rsync_from_remote( return PALO_SUCCESS; } -char* AgentUtils::get_local_ip() { - char hname[128]; - gethostname(hname, sizeof(hname)); - - // Let's hope this is not broken in the glibc we're using - struct hostent hent; - struct hostent *he = 0; - char hbuf[2048]; - int err = 0; - if (gethostbyname_r(hname, &hent, hbuf, sizeof(hbuf), &he, &err) != 0 - || he == 0) { - LOG(ERROR) << "gethostbyname : " << hname << ", " - << "error: " << err; - return NULL; - } - - return inet_ntoa(*(struct in_addr*)(he->h_addr_list[0])); -} - std::string AgentUtils::print_agent_status(AgentStatus status) { switch (status) { case PALO_SUCCESS: diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h index 46e58a9fb41f24..e00c3e686ce75f 100644 --- a/be/src/agent/utils.h +++ b/be/src/agent/utils.h @@ -126,9 +126,6 @@ class AgentUtils { const uint32_t transport_speed_limit_kbps, const uint32_t timeout_second); - // Get ip of local service - virtual char* get_local_ip(); - // Print AgentStatus as string virtual std::string print_agent_status(AgentStatus status); diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index 358d6c1bba9c13..e5ae99bfe43fb6 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -23,7 +23,7 @@ #include "exprs/agg_fn_evaluator.h" #include "exprs/anyval_util.h" -#include "runtime/buffered_tuple_stream.hpp" +#include "runtime/buffered_tuple_stream.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 20f5719796133b..bd106c3af7eebe 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -21,6 +21,7 @@ #include "olap_scan_node.h" #include "olap_utils.h" #include "olap/olap_reader.h" +#include "service/backend_options.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "runtime/mem_pool.h" @@ -135,8 +136,7 @@ Status OlapScanner::open() { fetch_request.__set_aggregation(_aggregation); if (!_reader->init(fetch_request, &_vec_conjunct_ctxs, _profile).ok()) { - std::string local_ip; - get_local_ip(&local_ip); + std::string local_ip = BackendOptions::get_localhost(); std::stringstream ss; if (MemTracker::limit_exceeded(*_runtime_state->mem_trackers())) { ss << "Memory limit exceeded. Tablet: " << fetch_request.tablet_id << ". host: " << local_ip; diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index 49b66166392c53..20de1df8351e41 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -39,6 +39,7 @@ #include "http/http_channel.h" #include "http/http_parser.h" #include "olap/file_helper.h" +#include "service/backend_options.h" #include "util/url_coding.h" #include "util/file_utils.h" #include "runtime/exec_env.h" @@ -328,7 +329,7 @@ Status MiniLoadAction::load( } req.__set_properties(params); req.files.push_back(file_path); - req.backend.__set_hostname(*_exec_env->local_ip()); + req.backend.__set_hostname(BackendOptions::get_localhost()); req.backend.__set_port(config::be_port); struct timeval tv; diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 437f9a0874912a..16a39d08e7770d 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -27,7 +27,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/runtime") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/runtime") add_library(Runtime STATIC - broker_mgr.cpp + broker_mgr.cpp buffered_block_mgr.cpp buffered_tuple_stream.cpp buffered_tuple_stream_ir.cpp diff --git a/be/src/runtime/broker_mgr.cpp b/be/src/runtime/broker_mgr.cpp index 7463331eaec478..455bcc396bb47d 100644 --- a/be/src/runtime/broker_mgr.cpp +++ b/be/src/runtime/broker_mgr.cpp @@ -20,6 +20,7 @@ #include "common/config.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/TPaloBrokerService.h" +#include "service/backend_options.h" #include "runtime/exec_env.h" #include "runtime/client_cache.h" #include "util/thrift_util.h" @@ -37,7 +38,7 @@ BrokerMgr::~BrokerMgr() { void BrokerMgr::init() { std::stringstream ss; - ss << *_exec_env->local_ip() << ":" << config::be_port; + ss << BackendOptions::get_localhost() << ":" << config::be_port; _client_id = ss.str(); } diff --git a/be/src/runtime/buffered_tuple_stream.cpp b/be/src/runtime/buffered_tuple_stream.cpp index ca9c8ed8b82e4f..2ffb72d0763a69 100644 --- a/be/src/runtime/buffered_tuple_stream.cpp +++ b/be/src/runtime/buffered_tuple_stream.cpp @@ -439,4 +439,56 @@ int BufferedTupleStream::compute_row_size(TupleRow* row) const { return size; } + +inline uint8_t* BufferedTupleStream::allocate_row(int size) { + DCHECK(!_closed); + + if (UNLIKELY(_write_block == NULL || _write_block->bytes_remaining() < size)) { + bool got_block = false; + _status = new_block_for_write(size, &got_block); + + if (!_status.ok() || !got_block) { + return NULL; + } + } + + DCHECK(_write_block != NULL); + // DCHECK(_write_block->is_pinned()); + DCHECK_GE(_write_block->bytes_remaining(), size); + ++_num_rows; + _write_block->add_row(); + return _write_block->allocate(size); +} + +inline void BufferedTupleStream::get_tuple_row(const RowIdx& idx, TupleRow* row) const { + DCHECK(!_closed); + //DCHECK(is_pinned()); + DCHECK(!_delete_on_read); + DCHECK_EQ(_blocks.size(), _block_start_idx.size()); + DCHECK_LT(idx.block(), _blocks.size()); + + uint8_t* data = _block_start_idx[idx.block()] + idx.offset(); + + if (_nullable_tuple) { + // Stitch together the tuples from the block and the NULL ones. + const int tuples_per_row = _desc.tuple_descriptors().size(); + uint32_t tuple_idx = idx.idx() * tuples_per_row; + + for (int i = 0; i < tuples_per_row; ++i) { + const uint8_t* null_word = _block_start_idx[idx.block()] + (tuple_idx >> 3); + const uint32_t null_pos = tuple_idx & 7; + const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); + row->set_tuple(i, reinterpret_cast( + reinterpret_cast(data) * is_not_null)); + data += _desc.tuple_descriptors()[i]->byte_size() * is_not_null; + ++tuple_idx; + } + } else { + for (int i = 0; i < _desc.tuple_descriptors().size(); ++i) { + row->set_tuple(i, reinterpret_cast(data)); + data += _desc.tuple_descriptors()[i]->byte_size(); + } + } +} + } diff --git a/be/src/runtime/buffered_tuple_stream.h b/be/src/runtime/buffered_tuple_stream.h index 9181b52833e815..284beb1c73e3d5 100644 --- a/be/src/runtime/buffered_tuple_stream.h +++ b/be/src/runtime/buffered_tuple_stream.h @@ -412,6 +412,23 @@ class BufferedTupleStream { int compute_num_null_indicator_bytes(int block_size) const; }; +inline bool BufferedTupleStream::add_row(TupleRow* row, uint8_t** dst) { + DCHECK(!_closed); + + if (LIKELY(deep_copy(row, dst))) { + return true; + } + + bool got_block = false; + _status = new_block_for_write(compute_row_size(row), &got_block); + + if (!_status.ok() || !got_block) { + return false; + } + + return deep_copy(row, dst); +} + } #endif diff --git a/be/src/runtime/buffered_tuple_stream.hpp b/be/src/runtime/buffered_tuple_stream.hpp deleted file mode 100644 index fbb19d262eb88f..00000000000000 --- a/be/src/runtime/buffered_tuple_stream.hpp +++ /dev/null @@ -1,101 +0,0 @@ -// Modifications copyright (C) 2017, Baidu.com, Inc. -// Copyright 2017 The Apache Software Foundation - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef INF_PALO_QE_SRC_BE_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H -#define INF_PALO_QE_SRC_BE_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H - -#include "runtime/buffered_tuple_stream.h" - -#include "runtime/descriptors.h" -#include "runtime/tuple_row.h" - -namespace palo { - -inline bool BufferedTupleStream::add_row(TupleRow* row, uint8_t** dst) { - DCHECK(!_closed); - - if (LIKELY(deep_copy(row, dst))) { - return true; - } - - bool got_block = false; - _status = new_block_for_write(compute_row_size(row), &got_block); - - if (!_status.ok() || !got_block) { - return false; - } - - return deep_copy(row, dst); -} - -inline uint8_t* BufferedTupleStream::allocate_row(int size) { - DCHECK(!_closed); - - if (UNLIKELY(_write_block == NULL || _write_block->bytes_remaining() < size)) { - bool got_block = false; - _status = new_block_for_write(size, &got_block); - - if (!_status.ok() || !got_block) { - return NULL; - } - } - - DCHECK(_write_block != NULL); - // DCHECK(_write_block->is_pinned()); - DCHECK_GE(_write_block->bytes_remaining(), size); - ++_num_rows; - _write_block->add_row(); - return _write_block->allocate(size); -} - -inline void BufferedTupleStream::get_tuple_row(const RowIdx& idx, TupleRow* row) const { - DCHECK(!_closed); - //DCHECK(is_pinned()); - DCHECK(!_delete_on_read); - DCHECK_EQ(_blocks.size(), _block_start_idx.size()); - DCHECK_LT(idx.block(), _blocks.size()); - - uint8_t* data = _block_start_idx[idx.block()] + idx.offset(); - - if (_nullable_tuple) { - // Stitch together the tuples from the block and the NULL ones. - const int tuples_per_row = _desc.tuple_descriptors().size(); - uint32_t tuple_idx = idx.idx() * tuples_per_row; - - for (int i = 0; i < tuples_per_row; ++i) { - const uint8_t* null_word = _block_start_idx[idx.block()] + (tuple_idx >> 3); - const uint32_t null_pos = tuple_idx & 7; - const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); - row->set_tuple(i, reinterpret_cast( - reinterpret_cast(data) * is_not_null)); - data += _desc.tuple_descriptors()[i]->byte_size() * is_not_null; - ++tuple_idx; - } - } else { - for (int i = 0; i < _desc.tuple_descriptors().size(); ++i) { - row->set_tuple(i, reinterpret_cast(data)); - data += _desc.tuple_descriptors()[i]->byte_size(); - } - } -} - -} - -#endif diff --git a/be/src/runtime/buffered_tuple_stream_ir.cpp b/be/src/runtime/buffered_tuple_stream_ir.cpp index d5c85b9f4824ec..eeb541addd2daf 100644 --- a/be/src/runtime/buffered_tuple_stream_ir.cpp +++ b/be/src/runtime/buffered_tuple_stream_ir.cpp @@ -18,7 +18,7 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/buffered_tuple_stream.hpp" +#include "runtime/buffered_tuple_stream.h" #include "runtime/descriptors.h" #include "runtime/tuple_row.h" diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp index 4d21188a114055..574f15df13b851 100644 --- a/be/src/runtime/etl_job_mgr.cpp +++ b/be/src/runtime/etl_job_mgr.cpp @@ -20,6 +20,7 @@ #include "gen_cpp/Status_types.h" #include "gen_cpp/Types_types.h" +#include "service/backend_options.h" #include "util/debug_util.h" #include "runtime/exec_env.h" #include "runtime/plan_fragment_executor.h" @@ -37,7 +38,7 @@ namespace palo { std::string EtlJobMgr::to_http_path(const std::string& file_name) { std::stringstream url; - url << "http://" << *_exec_env->local_ip() << ":" << config::webserver_port + url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port << "/api/_download_load?file=" << file_name; return url.str(); } diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index cc5cdac3af2574..2fcda7f501ebd1 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -87,7 +87,6 @@ ExecEnv::ExecEnv() : _fragment_mgr(new FragmentMgr(this)), _master_info(new TMasterInfo()), _etl_job_mgr(new EtlJobMgr(this)), - _local_ip(new std::string()), _load_path_mgr(new LoadPathMgr()), _disk_io_mgr(new DiskIoMgr()), _tmp_file_mgr(new TmpFileMgr), @@ -96,7 +95,6 @@ ExecEnv::ExecEnv() : _broker_mgr(new BrokerMgr(this)), _enable_webserver(true), _tz_database(TimezoneDatabase()) { - get_local_ip(_local_ip.get()); _client_cache->init_metrics(_metrics.get(), "palo.backends"); //_frontend_client_cache->init_metrics(_metrics.get(), "frontend-server.backends"); _result_mgr->init(); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 3af33cb2958412..69193b758f9bc0 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -135,10 +135,6 @@ class ExecEnv { return _tmp_file_mgr.get(); } - std::string* local_ip() { - return _local_ip.get(); - } - BfdParser* bfd_parser() const { return _bfd_parser.get(); } @@ -184,7 +180,6 @@ class ExecEnv { boost::scoped_ptr _fragment_mgr; boost::scoped_ptr _master_info; boost::scoped_ptr _etl_job_mgr; - boost::scoped_ptr _local_ip; boost::scoped_ptr _load_path_mgr; boost::scoped_ptr _disk_io_mgr; boost::scoped_ptr _tmp_file_mgr; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1961806b29270a..6efee680119e2f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -28,6 +28,7 @@ #include "agent/cgroups_mgr.h" #include "common/resource_tls.h" +#include "service/backend_options.h" #include "runtime/plan_fragment_executor.h" #include "runtime/exec_env.h" #include "runtime/datetime_value.h" @@ -208,7 +209,7 @@ void FragmentExecState::callback(const Status& status, RuntimeProfile* profile, std::string FragmentExecState::to_http_path(const std::string& file_name) { std::stringstream url; - url << "http://" << *_exec_env->local_ip() << ":" << config::webserver_port + url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port << "/api/_download_load?file=" << file_name; return url.str(); } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 36d87e32683d02..6fc316de0db2d7 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -29,10 +29,11 @@ #include "common/object_pool.h" #include "common/status.h" #include "exprs/expr.h" +#include "runtime/buffered_block_mgr.h" +#include "runtime/buffered_block_mgr2.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "runtime/load_path_mgr.h" -// #include "runtime/data_stream_recvr.hpp" #include "util/cpu_info.h" #include "util/mem_info.h" #include "util/debug_util.h" diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 8323589f94ee89..c63886d5ea8684 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -42,8 +42,6 @@ #include "gen_cpp/Types_types.h" // for TUniqueId #include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions #include "util/runtime_profile.h" -#include "runtime/buffered_block_mgr.h" -#include "runtime/buffered_block_mgr2.h" namespace palo { diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index 82e2e54f2e9531..453f7bd6dbdf27 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -20,6 +20,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/service") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/service") add_library(Service + backend_options.cpp backend_service.cpp ) diff --git a/be/src/service/backend_options.cpp b/be/src/service/backend_options.cpp new file mode 100644 index 00000000000000..11537d79f8b4b3 --- /dev/null +++ b/be/src/service/backend_options.cpp @@ -0,0 +1,60 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/backend_options.h" + +#include "common/logging.h" +#include "common/status.h" +#include "util/network_util.h" + +namespace palo { + +std::string BackendOptions::_localhost; + +void BackendOptions::init() { + std::vector hosts; + Status status = get_hosts_v4(&hosts); + + if (!status.ok()) { + LOG(FATAL) << status.get_error_msg(); + } + + if (hosts.empty()) { + LOG(FATAL) << "failed to get host"; + } + + std::string loopback; + std::vector::iterator addr_it = hosts.begin(); + for (; addr_it != hosts.end(); ++addr_it) { + if ((*addr_it).is_address_v4()) { + if ((*addr_it).is_loopback_v4()) { + loopback = (*addr_it).get_host_address_v4(); + } else { + _localhost = (*addr_it).get_host_address_v4(); + break; + } + } + } + + if (_localhost.empty()) { + _localhost = loopback; + } +} + +std::string BackendOptions::get_localhost() { + return _localhost; +} + +} diff --git a/be/src/service/backend_options.h b/be/src/service/backend_options.h new file mode 100644 index 00000000000000..6625884b552b6b --- /dev/null +++ b/be/src/service/backend_options.h @@ -0,0 +1,36 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BDG_PALO_BE_SERVICE_BACKEND_OPTIONS_H +#define BDG_PALO_BE_SERVICE_BACKEND_OPTIONS_H + +#include +#include + +namespace palo { + +class BackendOptions { +public: + static void init(); + static std::string get_localhost(); +private: + static std::string _localhost; + + DISALLOW_COPY_AND_ASSIGN(BackendOptions); +}; + +} + +#endif //BDG_PALO_BE_SERVICE_BACKEND_OPTIONS_H diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 0e3fd16e7d3710..54fa546f3299f8 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -24,6 +24,7 @@ #include #include #include +#include "service/backend_options.h" #include "util/network_util.h" #include "util/thrift_util.h" #include "util/thrift_server.h" @@ -83,11 +84,8 @@ Status BackendService::create_service(ExecEnv* exec_env, int port, ThriftServer* Status BackendService::create_rpc_service(ExecEnv* exec_env) { ReactorFactory::initialize(config::rpc_reactor_threads); - std::string host; - get_hostname(&host); - struct sockaddr_in addr; - InetAddr::initialize(&addr, host.c_str(), config::be_rpc_port); + InetAddr::initialize(&addr, BackendOptions::get_localhost().c_str(), config::be_rpc_port); Comm* comm = Comm::instance(); DispatchHandlerPtr dhp = std::make_shared(exec_env, comm, nullptr); diff --git a/be/src/service/palo_main.cpp b/be/src/service/palo_main.cpp index 1852596722ea60..92f089b94aa133 100644 --- a/be/src/service/palo_main.cpp +++ b/be/src/service/palo_main.cpp @@ -40,6 +40,7 @@ #include "agent/topic_subscriber.h" #include "util/palo_metrics.h" #include "olap/olap_main.h" +#include "service/backend_options.h" #include "service/backend_service.h" #include #include "common/resource_tls.h" @@ -103,6 +104,7 @@ int main(int argc, char** argv) { palo::init_daemon(argc, argv); palo::ResourceTls::init(); + palo::BackendOptions::init(); // initialize storage if (0 != palo::olap_main(argc, argv)) { @@ -115,6 +117,7 @@ int main(int argc, char** argv) { palo::ExecEnv exec_env; palo::FrontendHelper::setup(&exec_env); palo::ThriftServer* be_server = nullptr; + EXIT_IF_ERROR(palo::BackendService::create_service( &exec_env, palo::config::be_port, diff --git a/be/src/util/network_util.cpp b/be/src/util/network_util.cpp index 8634d8d0c3e7c2..fae915390fb486 100644 --- a/be/src/util/network_util.cpp +++ b/be/src/util/network_util.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include @@ -33,6 +34,25 @@ namespace palo { +InetAddress::InetAddress(struct sockaddr* addr) { + this->addr = *(struct sockaddr_in*)addr; +} + +bool InetAddress::is_address_v4 () const { + return addr.sin_family == AF_INET; +} + +bool InetAddress::is_loopback_v4() { + in_addr_t s_addr = addr.sin_addr.s_addr; + return (ntohl(s_addr) & 0xFF000000) == 0x7F000000; +} + +std::string InetAddress::get_host_address_v4() { + char addr_buf[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(addr.sin_addr), addr_buf, INET_ADDRSTRLEN); + return std::string(addr_buf); +} + static const std::string LOCALHOST("127.0.0.1"); Status get_hostname(std::string* hostname) { @@ -96,7 +116,7 @@ bool find_first_non_localhost(const std::vector& addresses, std::st return false; } -Status get_local_ip(std::string* local_ip) { +Status get_hosts_v4(std::vector* hosts) { ifaddrs* if_addrs = nullptr; if (getifaddrs(&if_addrs)) { std::stringstream ss; @@ -111,15 +131,11 @@ Status get_local_ip(std::string* local_ip) { } if (if_addr->ifa_addr->sa_family == AF_INET) { // check it is IP4 // is a valid IP4 Address - void* tmp_addr = &((struct sockaddr_in *)if_addr->ifa_addr)->sin_addr; - char addr_buf[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, tmp_addr, addr_buf, INET_ADDRSTRLEN); - if (LOCALHOST == addr_buf) { - continue; - } - local_ip->assign(addr_buf); - break; - } else if (if_addr->ifa_addr->sa_family == AF_INET6) { // check it is IP6 + hosts->emplace_back(if_addr->ifa_addr); + } + //TODO: IPv6 + /* + else if (if_addr->ifa_addr->sa_family == AF_INET6) { // check it is IP6 // is a valid IP6 Address void* tmp_addr = &((struct sockaddr_in6 *)if_addr->ifa_addr)->sin6_addr; char addr_buf[INET6_ADDRSTRLEN]; @@ -127,6 +143,7 @@ Status get_local_ip(std::string* local_ip) { local_ip->assign(addr_buf); break; } + */ } if (if_addrs != nullptr) { diff --git a/be/src/util/network_util.h b/be/src/util/network_util.h index 9144fd1ed3bfab..c069deb2ac5e5c 100644 --- a/be/src/util/network_util.h +++ b/be/src/util/network_util.h @@ -27,6 +27,17 @@ namespace palo { +//TODO: ipv6 +class InetAddress { +public: + InetAddress(struct sockaddr* addr); + bool is_address_v4() const; + bool is_loopback_v4(); + std::string get_host_address_v4(); +private: + struct sockaddr_in addr; +}; + // Looks up all IP addresses associated with a given hostname. Returns // an error status if any system call failed, otherwise OK. Even if OK // is returned, addresses may still be of zero length. @@ -40,7 +51,7 @@ bool find_first_non_localhost(const std::vector& addresses, std::st // Returns OK if a hostname can be found, false otherwise. Status get_hostname(std::string* hostname); -Status get_local_ip(std::string* local_ip); +Status get_hosts_v4(std::vector* hosts); // Utility method because Thrift does not supply useful constructors TNetworkAddress make_network_address(const std::string& hostname, int port); diff --git a/be/test/agent/utils_test.cpp b/be/test/agent/utils_test.cpp index 5aae2d0439df04..db1645f8fe4892 100644 --- a/be/test/agent/utils_test.cpp +++ b/be/test/agent/utils_test.cpp @@ -17,6 +17,7 @@ #include "gtest/gtest.h" #include "gmock/gmock.h" #include "agent/utils.h" +#include "service/backend_options.h" #include "util/logging.h" using ::testing::_; @@ -27,9 +28,7 @@ using std::string; namespace palo { TEST(AgentUtilsTest, Test) { - char* host_name; - AgentUtils agent_utils; - host_name = agent_utils.get_local_ip(); + const char* host_name = BackendOptions::get_localhost().c_str(); int cnt = std::count(host_name, host_name + 17, '.'); EXPECT_EQ(3, cnt); } @@ -42,6 +41,8 @@ int main(int argc, char **argv) { fprintf(stderr, "error read config file. \n"); return -1; } + + palo::BackendOptions::init(); palo::init_glog("be-test"); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/fe/src/com/baidu/palo/PaloFe.java b/fe/src/com/baidu/palo/PaloFe.java index 66575df4adfc85..cef5846cb66ecd 100644 --- a/fe/src/com/baidu/palo/PaloFe.java +++ b/fe/src/com/baidu/palo/PaloFe.java @@ -21,6 +21,7 @@ import com.baidu.palo.qe.QeService; import com.baidu.palo.service.ExecuteEnv; import com.baidu.palo.service.FeServer; +import com.baidu.palo.service.FrontendOptions; import com.google.common.base.Charsets; import com.google.common.base.Strings; @@ -64,6 +65,7 @@ public static void main(String[] args) { LOG.info("Palo FE start"); + FrontendOptions.init(); ExecuteEnv.setup(); ExecuteEnv env = ExecuteEnv.getInstance(); diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index e74a0f20f77a18..eb468fb83435ea 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -134,6 +134,7 @@ import com.baidu.palo.qe.JournalObservable; import com.baidu.palo.qe.SessionVariable; import com.baidu.palo.qe.VariableMgr; +import com.baidu.palo.service.FrontendOptions; import com.baidu.palo.system.Backend; import com.baidu.palo.system.Frontend; import com.baidu.palo.system.SystemInfoService; @@ -616,17 +617,7 @@ private FrontendNodeType getFeNodeType() { } private void getSelfHostPort() { - InetAddress addr = null; - try { - addr = InetAddress.getLocalHost(); - } catch (UnknownHostException e) { - LOG.error(e); - System.out.println("error to get local address. will exit"); - System.exit(-1); - } - - String myIP = addr.getHostAddress().toString(); - selfNode = new Pair(myIP, Config.edit_log_port); + selfNode = new Pair(FrontendOptions.getLocalHostAddress(), Config.edit_log_port); } private void checkArgs(String[] args) throws AnalysisException { @@ -702,8 +693,7 @@ private void transferToMaster() throws IOException { LOG.info("checkpointer thread started. thread id is {}", checkpointThreadId); // ClusterInfoService - InetAddress masterAddress = InetAddress.getLocalHost(); - Catalog.getCurrentSystemInfo().setMaster(masterAddress.getHostAddress(), Config.rpc_port, clusterId, epoch); + Catalog.getCurrentSystemInfo().setMaster(FrontendOptions.getLocalHostAddress(), Config.rpc_port, clusterId, epoch); Catalog.getCurrentSystemInfo().start(); pullLoadJobMgr.start(); @@ -732,12 +722,12 @@ private void transferToMaster() throws IOException { // catalog recycle bin getRecycleBin().start(); - this.masterIp = masterAddress.getHostAddress(); + this.masterIp = FrontendOptions.getLocalHostAddress(); this.masterRpcPort = Config.rpc_port; this.masterHttpPort = Config.http_port; MasterInfo info = new MasterInfo(); - info.setIp(masterAddress.getHostAddress()); + info.setIp(FrontendOptions.getLocalHostAddress()); info.setRpcPort(Config.rpc_port); info.setHttpPort(Config.http_port); editLog.logMasterInfo(info); diff --git a/fe/src/com/baidu/palo/common/util/NetUtils.java b/fe/src/com/baidu/palo/common/util/NetUtils.java index 0124e0913240be..e08c26a950a25b 100644 --- a/fe/src/com/baidu/palo/common/util/NetUtils.java +++ b/fe/src/com/baidu/palo/common/util/NetUtils.java @@ -1,13 +1,8 @@ -// Modifications copyright (C) 2017, Baidu.com, Inc. -// Copyright 2017 The Apache Software Foundation - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // @@ -20,7 +15,12 @@ package com.baidu.palo.common.util; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.List; public class NetUtils { @@ -36,4 +36,23 @@ public static InetSocketAddress createSocketAddr(String target) { return new InetSocketAddress(hostname, port); } + + public static void getHosts(List hosts) { + Enumeration n = null; + + try { + n = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + throw new RuntimeException("failed to get network interfaces"); + } + + while (n.hasMoreElements()) { + NetworkInterface e = n.nextElement(); + Enumeration a = e.getInetAddresses(); + while (a.hasMoreElements()) { + InetAddress addr = a.nextElement(); + hosts.add(addr); + } + } + } } diff --git a/fe/src/com/baidu/palo/planner/BrokerScanNode.java b/fe/src/com/baidu/palo/planner/BrokerScanNode.java index fafed20b4cc664..b6b1fd917d63f7 100644 --- a/fe/src/com/baidu/palo/planner/BrokerScanNode.java +++ b/fe/src/com/baidu/palo/planner/BrokerScanNode.java @@ -43,6 +43,7 @@ import com.baidu.palo.common.Config; import com.baidu.palo.common.InternalException; import com.baidu.palo.load.BrokerFileGroup; +import com.baidu.palo.service.FrontendOptions; import com.baidu.palo.system.Backend; import com.baidu.palo.thrift.TBrokerFileStatus; import com.baidu.palo.thrift.TBrokerListPathRequest; @@ -483,12 +484,8 @@ private TBrokerScanRange brokerScanRange(TScanRangeLocations locations) { private void parseBrokerFile(String path, ArrayList fileStatuses) throws InternalException { BrokerMgr.BrokerAddress brokerAddress = null; try { - String localIp = ""; - try { - localIp = InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - } - brokerAddress = Catalog.getInstance().getBrokerMgr().getBroker(brokerDesc.getName(), localIp); + String localIP = FrontendOptions.getLocalHostAddress(); + brokerAddress = Catalog.getInstance().getBrokerMgr().getBroker(brokerDesc.getName(), localIP); } catch (AnalysisException e) { throw new InternalException(e.getMessage()); } diff --git a/fe/src/com/baidu/palo/planner/SchemaScanNode.java b/fe/src/com/baidu/palo/planner/SchemaScanNode.java index 1534198109daff..e0019533380bf0 100644 --- a/fe/src/com/baidu/palo/planner/SchemaScanNode.java +++ b/fe/src/com/baidu/palo/planner/SchemaScanNode.java @@ -26,6 +26,7 @@ import com.baidu.palo.common.Config; import com.baidu.palo.common.InternalException; import com.baidu.palo.qe.ConnectContext; +import com.baidu.palo.service.FrontendOptions; import com.baidu.palo.thrift.TPlanNode; import com.baidu.palo.thrift.TPlanNodeType; import com.baidu.palo.thrift.TScanRangeLocations; @@ -50,7 +51,7 @@ public class SchemaScanNode extends ScanNode { private String schemaTable; private String schemaWild; private String user; - private String frontendIp; + private String frontendIP; private int frontendPort; /** @@ -74,11 +75,7 @@ public void finalize(Analyzer analyzer) throws InternalException { schemaTable = analyzer.getSchemaTable(); schemaWild = analyzer.getSchemaWild(); user = analyzer.getUser(); - try { - frontendIp = InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - throw new InternalException("get host failed."); - } + frontendIP = FrontendOptions.getLocalHostAddress(); frontendPort = Config.rpc_port; } @@ -109,7 +106,7 @@ protected void toThrift(TPlanNode msg) { if (ctx != null) { msg.schema_scan_node.setThread_id(ConnectContext.get().getConnectionId()); } - msg.schema_scan_node.setIp(frontendIp); + msg.schema_scan_node.setIp(frontendIP); msg.schema_scan_node.setPort(frontendPort); } diff --git a/fe/src/com/baidu/palo/qe/Coordinator.java b/fe/src/com/baidu/palo/qe/Coordinator.java index 22b936c7b90afa..08dd031d272315 100644 --- a/fe/src/com/baidu/palo/qe/Coordinator.java +++ b/fe/src/com/baidu/palo/qe/Coordinator.java @@ -34,6 +34,7 @@ import com.baidu.palo.planner.Planner; import com.baidu.palo.planner.ResultSink; import com.baidu.palo.planner.ScanNode; +import com.baidu.palo.service.FrontendOptions; import com.baidu.palo.system.Backend; import com.baidu.palo.task.LoadEtlTask; import com.baidu.palo.thrift.BackendService; @@ -93,16 +94,7 @@ public class Coordinator { private static final Logger LOG = LogManager.getLogger(Coordinator.class); private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - private static String localIP; - - static { - try { - localIP = InetAddress.getLocalHost().getHostAddress().toString(); - } catch (UnknownHostException e) { - LOG.warn(DebugUtil.getStackTrace(e)); - localIP = "127.0.0.1"; - } - } + private static String localIP = FrontendOptions.getLocalHostAddress(); // Overall status of the entire query; set to the first reported fragment error // status or to CANCELLED, if Cancel() is called. diff --git a/fe/src/com/baidu/palo/service/FeServer.java b/fe/src/com/baidu/palo/service/FeServer.java index 986c13d4b4f3ae..cd1da1e7ae4724 100644 --- a/fe/src/com/baidu/palo/service/FeServer.java +++ b/fe/src/com/baidu/palo/service/FeServer.java @@ -1,13 +1,8 @@ -// Modifications copyright (C) 2017, Baidu.com, Inc. -// Copyright 2017 The Apache Software Foundation +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // diff --git a/fe/src/com/baidu/palo/service/FrontendOptions.java b/fe/src/com/baidu/palo/service/FrontendOptions.java new file mode 100644 index 00000000000000..2cdf5cea13f697 --- /dev/null +++ b/fe/src/com/baidu/palo/service/FrontendOptions.java @@ -0,0 +1,69 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.service; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.List; +import java.util.ArrayList; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.baidu.palo.common.util.NetUtils; + + +public class FrontendOptions { + private static final Logger LOG = LogManager.getLogger(FrontendOptions.class); + + public static void init() { + List hosts = new ArrayList(); + NetUtils.getHosts(hosts); + if (hosts.isEmpty()) { + LOG.error("fail to get localhost"); + System.exit(-1); + } + + InetAddress loopBack = null; + for (InetAddress addr : hosts) { + if (addr instanceof Inet4Address) { + if (addr.isLoopbackAddress()) { + loopBack = addr; + } else { + localHost = addr; + break; + } + } + } + + if (localHost == null) { + localHost = loopBack; + } + } + + public static InetAddress getLocalHost() { + return localHost; + } + + public static String getLocalHostAddress() { + return localHost.getHostAddress(); + } + + private static InetAddress localHost; +}; + diff --git a/fe/src/com/baidu/palo/service/FrontendServiceImpl.java b/fe/src/com/baidu/palo/service/FrontendServiceImpl.java index 3af3f48dc45cfd..f3b97c00789d7f 100644 --- a/fe/src/com/baidu/palo/service/FrontendServiceImpl.java +++ b/fe/src/com/baidu/palo/service/FrontendServiceImpl.java @@ -1,13 +1,8 @@ -// Modifications copyright (C) 2017, Baidu.com, Inc. -// Copyright 2017 The Apache Software Foundation - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // @@ -43,6 +38,7 @@ import com.baidu.palo.qe.ConnectProcessor; import com.baidu.palo.qe.QeProcessor; import com.baidu.palo.qe.VariableMgr; +import com.baidu.palo.service.FrontendOptions; import com.baidu.palo.system.SystemInfoService; import com.baidu.palo.thrift.FrontendService; import com.baidu.palo.thrift.FrontendServiceVersion; @@ -323,7 +319,7 @@ public static String getMiniLoadStmt(TMiniLoadRequest request) throws UnknownHos stringBuilder.append("\"{").append(Joiner.on(",").join(request.files)).append("}\""); } - InetAddress masterAddress = InetAddress.getLocalHost(); + InetAddress masterAddress = FrontendOptions.getLocalHost(); stringBuilder.append(" http://").append(masterAddress.getHostAddress()).append(":"); stringBuilder.append(Config.http_port).append("/api/").append(request.db).append("/"); stringBuilder.append(request.tbl).append("/_load?label=").append(request.label); diff --git a/fe/src/com/baidu/palo/system/SystemInfoService.java b/fe/src/com/baidu/palo/system/SystemInfoService.java index 65bf765a9c0d54..e92fe9627193ea 100644 --- a/fe/src/com/baidu/palo/system/SystemInfoService.java +++ b/fe/src/com/baidu/palo/system/SystemInfoService.java @@ -852,11 +852,6 @@ public static Pair validateHostAndPort(String hostPort) throws host = inetAddress.getHostAddress(); } - if (host.equals("127.0.0.1")) { - InetAddress inetAddress = InetAddress.getLocalHost(); - host = inetAddress.getHostAddress(); - } - // validate port heartbeatPort = Integer.valueOf(pair[1]); diff --git a/fe/src/com/baidu/palo/task/ExportExportingTask.java b/fe/src/com/baidu/palo/task/ExportExportingTask.java index 06586f78a084dd..e7eb19ffcfe879 100644 --- a/fe/src/com/baidu/palo/task/ExportExportingTask.java +++ b/fe/src/com/baidu/palo/task/ExportExportingTask.java @@ -31,6 +31,7 @@ import com.baidu.palo.load.ExportJob; import com.baidu.palo.qe.Coordinator; import com.baidu.palo.qe.QeProcessor; +import com.baidu.palo.service.FrontendOptions; import com.baidu.palo.system.Backend; import com.baidu.palo.thrift.TAgentResult; import com.baidu.palo.thrift.TBrokerOperationStatus; @@ -278,13 +279,8 @@ private Status releaseSnapshotPaths() { private Status moveTmpFiles() { BrokerMgr.BrokerAddress brokerAddress = null; try { - String localIp = ""; - try { - localIp = InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - // getBroker will deal - } - brokerAddress = Catalog.getInstance().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), localIp); + String localIP = FrontendOptions.getLocalHostAddress(); + brokerAddress = Catalog.getInstance().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), localIP); } catch (AnalysisException e) { String failMsg = "Broker rename failed. msg=" + e.getMessage(); LOG.warn(failMsg);