diff --git a/flex/engines/http_server/service/hqps_service.cc b/flex/engines/http_server/service/hqps_service.cc index 8dfbea727e52..d40748c5e2d2 100644 --- a/flex/engines/http_server/service/hqps_service.cc +++ b/flex/engines/http_server/service/hqps_service.cc @@ -17,6 +17,21 @@ #include "flex/engines/http_server/workdir_manipulator.h" namespace server { +bool check_port_occupied(uint16_t port) { + VLOG(10) << "Check port " << port << " is occupied or not."; + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + return false; + } + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + int ret = bind(sockfd, (struct sockaddr*) &addr, sizeof(addr)); + close(sockfd); + return ret < 0; +} + ServiceConfig::ServiceConfig() : bolt_port(DEFAULT_BOLT_PORT), admin_port(DEFAULT_ADMIN_PORT), @@ -27,6 +42,8 @@ ServiceConfig::ServiceConfig() external_thread_num(2), start_admin_service(true), start_compiler(false), + enable_gremlin(true), + enable_bolt(true), metadata_store_type_(gs::MetadataStoreType::kLocalFile) {} const std::string HQPSService::DEFAULT_GRAPH_NAME = "modern_graph"; @@ -183,6 +200,28 @@ void HQPSService::start_query_actors() { } } +bool HQPSService::check_compiler_ready() const { + if (service_config_.start_compiler) { + if (service_config_.enable_gremlin) { + if (check_port_occupied(service_config_.gremlin_port)) { + return true; + } else { + LOG(ERROR) << "Gremlin server is not ready!"; + return false; + } + } + if (service_config_.enable_bolt) { + if (check_port_occupied(service_config_.bolt_port)) { + return true; + } else { + LOG(ERROR) << "Bolt server is not ready!"; + return false; + } + } + } + return true; +} + bool HQPSService::start_compiler_subprocess( const std::string& graph_schema_path) { LOG(INFO) << "Start compiler subprocess"; @@ -213,14 +252,27 @@ bool HQPSService::start_compiler_subprocess( boost::process::child(cmd_str, boost::process::std_out > compiler_log, boost::process::std_err > compiler_log); LOG(INFO) << "Compiler process started with pid: " << compiler_process_.id(); - // sleep for a while to wait for the compiler to start - std::this_thread::sleep_for(std::chrono::seconds(4)); - // check if the compiler process is still running - if (!compiler_process_.running()) { - LOG(ERROR) << "Compiler process failed to start!"; - return false; + // sleep for a maximum 30 seconds to wait for the compiler process to start + int32_t sleep_time = 0; + int32_t max_sleep_time = 30; + int32_t sleep_interval = 4; + while (sleep_time < max_sleep_time) { + std::this_thread::sleep_for(std::chrono::seconds(sleep_interval)); + if (!compiler_process_.running()) { + LOG(ERROR) << "Compiler process failed to start!"; + return false; + } + // check query server port is ready + if (check_compiler_ready()) { + LOG(INFO) << "Compiler server is ready!"; + return true; + } + sleep_time += sleep_interval; + LOG(INFO) << "Sleep " << sleep_time << " seconds to wait for compiler " + << "server to start."; } - return true; + LOG(ERROR) << "Max sleep time reached, fail to start compiler server!"; + return false; } bool HQPSService::stop_compiler_subprocess() { diff --git a/flex/engines/http_server/service/hqps_service.h b/flex/engines/http_server/service/hqps_service.h index 04c2a9026024..d55ac53356ab 100644 --- a/flex/engines/http_server/service/hqps_service.h +++ b/flex/engines/http_server/service/hqps_service.h @@ -53,6 +53,8 @@ struct ServiceConfig { bool start_admin_service; // Whether to start the admin service or only // start the query service. bool start_compiler; + bool enable_gremlin; + bool enable_bolt; gs::MetadataStoreType metadata_store_type_; // Those has not default value @@ -103,6 +105,8 @@ class HQPSService { bool stop_compiler_subprocess(); + bool check_compiler_ready() const; + private: HQPSService() = default; @@ -203,15 +207,23 @@ struct convert { auto endpoint_node = compiler_node["endpoint"]; if (endpoint_node) { auto bolt_node = endpoint_node["bolt_connector"]; - if (bolt_node && bolt_node["port"] && - bolt_node["disabled"].as() == false) { + if (bolt_node && bolt_node["disabled"]) { + service_config.enable_bolt = !bolt_node["disabled"].as(); + } else { + service_config.enable_bolt = true; + } + if (bolt_node && bolt_node["port"]) { service_config.bolt_port = bolt_node["port"].as(); } else { LOG(INFO) << "bolt_port not found, or disabled"; } auto gremlin_node = endpoint_node["gremlin_connector"]; - if (gremlin_node && gremlin_node["port"] && - gremlin_node["disabled"].as() == false) { + if (gremlin_node && gremlin_node["disabled"]) { + service_config.enable_gremlin = !gremlin_node["disabled"].as(); + } else { + service_config.enable_gremlin = true; + } + if (gremlin_node && gremlin_node["port"]) { service_config.gremlin_port = gremlin_node["port"].as(); } else { LOG(INFO) << "gremlin_port not found, use default value "