From 8e89935b00168c3fe50c2ad890355e16b59edf3a Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Wed, 28 Aug 2024 14:13:30 +0800 Subject: [PATCH 1/2] [fix](cloud) Fix cloud auto start and add a regression case --- .../doris/cloud/catalog/CloudReplica.java | 14 +- .../cloud/system/CloudSystemInfoService.java | 16 +- .../org/apache/doris/qe/ConnectContext.java | 37 +--- .../multi_cluster/test_auto_start.groovy | 172 ++++++++++++++++++ 4 files changed, 193 insertions(+), 46 deletions(-) create mode 100644 regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index be0c510559eda2..2b2d3b1cbdceef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -148,6 +148,13 @@ public long getBackendId() { } private long getBackendIdImpl(String cluster) { + // if cluster is SUSPENDED, wait + try { + cluster = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(cluster); + } catch (DdlException e) { + // this function cant throw exception. so just log it + LOG.warn("cant resume cluster {}, exception", cluster, e); + } // check default cluster valid. if (Strings.isNullOrEmpty(cluster)) { LOG.warn("failed to get available be, clusterName: {}", cluster); @@ -161,13 +168,6 @@ private long getBackendIdImpl(String cluster) { return -1; } - // if cluster is SUSPENDED, wait - try { - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(cluster); - } catch (DdlException e) { - // this function cant throw exception. so just log it - LOG.warn("cant resume cluster {}, exception", cluster, e); - } String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster); if (isColocated()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 48728efb003aba..bc856fde2bea9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -728,20 +728,20 @@ public String getClusterNameAutoStart(final String clusterName) { return cloudClusterTypeAndName.clusterName; } - public void waitForAutoStart(String clusterName) throws DdlException { + public String waitForAutoStart(String clusterName) throws DdlException { if (Config.isNotCloudMode()) { - return; + return null; } clusterName = getClusterNameAutoStart(clusterName); if (Strings.isNullOrEmpty(clusterName)) { LOG.warn("auto start in cloud mode, but clusterName empty {}", clusterName); - return; + return null; } String clusterStatus = getCloudStatusByName(clusterName); if (Strings.isNullOrEmpty(clusterStatus)) { // for cluster rename or cluster dropped LOG.warn("cant find clusterStatus in fe, clusterName {}", clusterName); - return; + return null; } if (Cloud.ClusterStatus.valueOf(clusterStatus) == Cloud.ClusterStatus.MANUAL_SHUTDOWN) { @@ -756,7 +756,7 @@ public void waitForAutoStart(String clusterName) throws DdlException { // root ? see StatisticsUtil.buildConnectContext if (ConnectContext.get() != null && ConnectContext.get().getUserIdentity().isRootUser()) { LOG.warn("auto start daemon thread run in root, not resume cluster {}-{}", clusterName, clusterStatus); - return; + return null; } Cloud.AlterClusterRequest.Builder builder = Cloud.AlterClusterRequest.newBuilder(); builder.setCloudUniqueId(Config.cloud_unique_id); @@ -785,7 +785,8 @@ public void waitForAutoStart(String clusterName) throws DdlException { StopWatch stopWatch = new StopWatch(); stopWatch.start(); boolean hasAutoStart = false; - while (!String.valueOf(Cloud.ClusterStatus.NORMAL).equals(clusterStatus) + boolean existAliveBe = true; + while ((!String.valueOf(Cloud.ClusterStatus.NORMAL).equals(clusterStatus) || !existAliveBe) && retryTime < retryTimes) { hasAutoStart = true; ++retryTime; @@ -803,6 +804,8 @@ public void waitForAutoStart(String clusterName) throws DdlException { LOG.info("change cluster sleep wait InterruptedException: ", e); } clusterStatus = getCloudStatusByName(clusterName); + // Check that the bes node in the cluster have at least one alive + existAliveBe = getBackendsByClusterName(clusterName).stream().anyMatch(Backend::isAlive); } if (retryTime >= retryTimes) { // auto start timeout @@ -815,5 +818,6 @@ public void waitForAutoStart(String clusterName) throws DdlException { if (hasAutoStart) { LOG.info("auto start cluster {}, start cost {} ms", clusterName, stopWatch.getTime()); } + return clusterName; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 1b70c5b318bd10..874e0dab17f194 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -1249,7 +1249,7 @@ public String getCloudCluster(boolean updateErr) { String choseWay = null; if (!Strings.isNullOrEmpty(this.cloudCluster)) { cluster = this.cloudCluster; - choseWay = "use @cluster"; + choseWay = "use context cluster"; LOG.debug("finally set context cluster name {} for user {} with chose way '{}'", cloudCluster, getCurrentUserIdentity(), choseWay); return cluster; @@ -1260,9 +1260,9 @@ public String getCloudCluster(boolean updateErr) { cluster = defaultCluster; choseWay = "default cluster"; } else { - String authorizedCluster = getAuthorizedCloudCluster(); - if (!Strings.isNullOrEmpty(authorizedCluster)) { - cluster = authorizedCluster; + CloudClusterResult cloudClusterTypeAndName = getCloudClusterByPolicy(); + if (!Strings.isNullOrEmpty(cloudClusterTypeAndName.clusterName)) { + cluster = cloudClusterTypeAndName.clusterName; choseWay = "authorized cluster"; } } @@ -1293,35 +1293,6 @@ public String getDefaultCloudCluster() { return null; } - public String getAuthorizedCloudCluster() { - List cloudClusterNames = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames(); - // get all available cluster of the user - for (String cloudClusterName : cloudClusterNames) { - if (!Env.getCurrentEnv().getAuth().checkCloudPriv(getCurrentUserIdentity(), - cloudClusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) { - continue; - } - // find a cluster has more than one alive be - List bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) - .getBackendsByClusterName(cloudClusterName); - AtomicBoolean hasAliveBe = new AtomicBoolean(false); - bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend -> { - if (LOG.isDebugEnabled()) { - LOG.debug("get a clusterName {}, it's has more than one alive be {}", cloudClusterName, backend); - } - hasAliveBe.set(true); - }); - if (hasAliveBe.get()) { - if (LOG.isDebugEnabled()) { - LOG.debug("set context cluster name {}", cloudClusterName); - } - return cloudClusterName; - } - } - - return null; - } - public StatsErrorEstimator getStatsErrorEstimator() { return statsErrorEstimator; } diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy new file mode 100644 index 00000000000000..2ce9a9d8f4b531 --- /dev/null +++ b/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper +import groovy.json.JsonOutput +import org.awaitility.Awaitility; +import org.apache.doris.regression.util.Http +import static java.util.concurrent.TimeUnit.SECONDS; + +suite('test_auto_start_in_cloud', 'multi_cluster') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_pre_heating_time_limit_sec=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.setFeNum(3) + options.setBeNum(3) + options.cloudMode = true + options.connectToFollower = true + + def getClusterFragementStatus = { def fe -> + def (feHost, feHttpPort) = fe.getHttpAddress() + // curl -X GET -u root: '128.1.1.1:8030/rest/v2/manager/cluster/cluster_info/cloud_cluster_status' + def url = 'http://' + feHost + ':' + feHttpPort + '/rest/v2/manager/cluster/cluster_info/cloud_cluster_status' + def result = Http.GET(url, true) + result + } + + + def set_cluster_status = { String unique_id , String cluster_id, String status, def ms -> + def jsonOutput = new JsonOutput() + def reqBody = [ + cloud_unique_id: unique_id, + cluster : [ + cluster_id : cluster_id, + cluster_status : status + ] + ] + def js = jsonOutput.toJson(reqBody) + log.info("drop cluster req: ${js} ".toString()) + + def set_cluster_status_api = { request_body, check_func -> + httpTest { + endpoint ms.host+':'+ms.httpPort + uri "/MetaService/http/set_cluster_status?token=greedisgood9999" + body request_body + check check_func + } + } + + set_cluster_status_api.call(js) { + respCode, body -> + log.info("set cluster status resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + } + + docker(options) { + sql """ + CREATE TABLE table1 ( + class INT, + id INT, + score INT SUM + ) + AGGREGATE KEY(class, id) + DISTRIBUTED BY HASH(class) BUCKETS 48 + """ + + sql """INSERT INTO table1 VALUES (1, 1, 100)""" + // master + def fe1 = cluster.getFeByIndex(1) + // ms + def ms = cluster.getAllMetaservices().get(0) + + def result = sql_return_maparray """SHOW CLUSTERS""" + String clusterName = result[0].cluster + def tag = getCloudBeTagByName(clusterName) + logger.info("tag = {}", tag) + + def jsonSlurper = new JsonSlurper() + def jsonObject = jsonSlurper.parseText(tag) + String cloudClusterId = jsonObject.cloud_cluster_id + String uniqueId = jsonObject.cloud_unique_id + + sleep(5 * 1000) + + Map fragmentUpdateTimeMap = [:] + + // no read,write,sc, 20s suspend cluster + boolean clusterCanSuspend = true + for (int i = 0; i < 20; i++) { + result = getClusterFragementStatus(fe1) + result.data.compute_cluster_id.each { + if (fragmentUpdateTimeMap[it.host] == null) { + fragmentUpdateTimeMap[it.host] = it.lastFragmentUpdateTime + } else if (fragmentUpdateTimeMap[it.host] != it.lastFragmentUpdateTime) { + log.info("fragment update time changed be: {} old time: {} new time: {}", it.host, fragmentUpdateTimeMap[it.host], it.lastFragmentUpdateTime) + clusterCanSuspend = false + } + } + sleep(1 * 1000) + } + assertTrue(clusterCanSuspend) + + // cloud control set cluster status SUSPENDED + set_cluster_status(uniqueId, cloudClusterId, "SUSPENDED", ms) + + dockerAwaitUntil(5) { + tag = getCloudBeTagByName(clusterName) + logger.info("tag = {}", tag) + jsonObject = jsonSlurper.parseText(tag) + String cluster_status = jsonObject.cloud_cluster_status + cluster_status == "SUSPENDED" + } + + cluster.stopBackends(1,2,3) + + // select + future1 = thread { + def begin = System.currentTimeMillis(); + // root cant resume, due to deamon thread use root + def connInfo = context.threadLocalConn.get() + result = connect(user = 'admin', password = '', url = connInfo.conn.getMetaData().getURL()) { + sql 'SELECT * FROM table1' + } + def cost = System.currentTimeMillis() - begin; + log.info("result {} time cost: {}", result, cost) + assertTrue(cost > 5000) + assertEquals(1, result.size()) + } + // insert + + // cloud control + future2 = thread { + // check cluster "TO_RESUME" + dockerAwaitUntil(5) { + tag = getCloudBeTagByName(clusterName) + logger.info("tag = {}", tag) + jsonObject = jsonSlurper.parseText(tag) + String cluster_status = jsonObject.cloud_cluster_status + cluster_status == "TO_RESUME" + } + sleep(5 * 1000) + cluster.startBackends(1,2,3) + set_cluster_status(uniqueId, cloudClusterId, "NORMAL", ms) + } + + future1.get() + future2.get() + } +} From 73ea8d1bfc12a111ecd4dd24aa3e84608b88c265 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Fri, 6 Sep 2024 15:08:58 +0800 Subject: [PATCH 2/2] fix --- .../src/main/java/org/apache/doris/qe/ConnectContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 874e0dab17f194..7119533520d854 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -1261,7 +1261,7 @@ public String getCloudCluster(boolean updateErr) { choseWay = "default cluster"; } else { CloudClusterResult cloudClusterTypeAndName = getCloudClusterByPolicy(); - if (!Strings.isNullOrEmpty(cloudClusterTypeAndName.clusterName)) { + if (cloudClusterTypeAndName != null && !Strings.isNullOrEmpty(cloudClusterTypeAndName.clusterName)) { cluster = cloudClusterTypeAndName.clusterName; choseWay = "authorized cluster"; }