From 18c62901219b02135a3cb93230340942dad677d9 Mon Sep 17 00:00:00 2001 From: w41ter Date: Tue, 6 Aug 2024 08:13:24 +0000 Subject: [PATCH] [chore](table) Add batch method to get visible version of the olap table Since get visible version is a heavy operation in the cloud mode, this PR add a batch method, to obtain all visible versions via only one RPC. --- cloud/src/meta-service/meta_service.cpp | 11 +- .../org/apache/doris/catalog/OlapTable.java | 116 +++++++++++++----- .../doris/cloud/catalog/CloudPartition.java | 17 +-- .../apache/doris/cloud/rpc/VersionHelper.java | 38 +++++- 4 files changed, 132 insertions(+), 50 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index ecf054b68b6e76..0b27d6d50f6198 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -315,11 +315,14 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr return; } - size_t num_acquired = request->partition_ids_size(); + size_t num_acquired = + is_table_version ? request->table_ids_size() : request->partition_ids_size(); response->mutable_versions()->Reserve(num_acquired); response->mutable_db_ids()->CopyFrom(request->db_ids()); response->mutable_table_ids()->CopyFrom(request->table_ids()); - response->mutable_partition_ids()->CopyFrom(request->partition_ids()); + if (!is_table_version) { + response->mutable_partition_ids()->CopyFrom(request->partition_ids()); + } constexpr size_t BATCH_SIZE = 500; std::vector version_keys; @@ -327,7 +330,7 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr version_keys.reserve(BATCH_SIZE); version_values.reserve(BATCH_SIZE); while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) && - response->versions_size() < response->partition_ids_size()) { + response->versions_size() < num_acquired) { std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -343,11 +346,11 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr for (size_t j = i; j < limit; j++) { int64_t db_id = request->db_ids(j); int64_t table_id = request->table_ids(j); - int64_t partition_id = request->partition_ids(j); std::string ver_key; if (is_table_version) { table_version_key({instance_id, db_id, table_id}, &ver_key); } else { + int64_t partition_id = request->partition_ids(j); partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key); } version_keys.push_back(std::move(ver_key)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 884cd4f4054e69..d7ac361d577efd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -47,9 +47,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.DeepCopy; import org.apache.doris.common.io.Text; -import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVVersionSnapshot; @@ -57,7 +57,6 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; -import org.apache.doris.qe.StmtExecutor; import org.apache.doris.resource.Tag; import org.apache.doris.rpc.RpcException; import org.apache.doris.statistics.AnalysisInfo; @@ -2225,7 +2224,6 @@ public int getBaseSchemaVersion() { return baseIndexMeta.getSchemaVersion(); } - public void setEnableSingleReplicaCompaction(boolean enableSingleReplicaCompaction) { if (tableProperty == null) { tableProperty = new TableProperty(new HashMap<>()); @@ -2849,6 +2847,7 @@ public long getVisibleVersion() { if (Config.isNotCloudMode()) { return tableAttributes.getVisibleVersion(); } + // get version rpc Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder() .setDbId(this.getDatabase().getId()) @@ -2858,7 +2857,7 @@ public long getVisibleVersion() { .build(); try { - Cloud.GetVersionResponse resp = getVersionFromMeta(request); + Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request); long version = -1; if (resp.getStatus().getCode() == Cloud.MetaServiceCode.OK) { version = resp.getVersion(); @@ -2874,7 +2873,90 @@ public long getVisibleVersion() { } return version; } catch (RpcException e) { - throw new RuntimeException("get version from meta service failed"); + throw new RuntimeException("get version from meta service failed", e); + } + } + + // Get the table versions in batch. + public static List getVisibleVersionByTableIds(Collection tableIds) { + List tables = new ArrayList<>(); + + InternalCatalog catalog = Env.getCurrentEnv().getInternalCatalog(); + for (long tableId : tableIds) { + Table table = catalog.getTableByTableId(tableId); + if (table == null) { + throw new RuntimeException("get table visible version failed, no such table " + tableId + " exists"); + } + if (table.getType() != TableType.OLAP) { + throw new RuntimeException( + "get table visible version failed, table " + tableId + " is not a OLAP table"); + } + tables.add((OlapTable) table); + } + + return getVisibleVersionInBatch(tables); + } + + // Get the table versions in batch. + public static List getVisibleVersionInBatch(Collection tables) { + if (tables.isEmpty()) { + return new ArrayList<>(); + } + + if (Config.isNotCloudMode()) { + return tables.stream() + .map(table -> table.tableAttributes.getVisibleVersion()) + .collect(Collectors.toList()); + } + + List dbIds = new ArrayList<>(); + List tableIds = new ArrayList<>(); + for (OlapTable table : tables) { + dbIds.add(table.getDatabase().getId()); + tableIds.add(table.getId()); + } + + return getVisibleVersionFromMeta(dbIds, tableIds); + } + + private static List getVisibleVersionFromMeta(List dbIds, List tableIds) { + // get version rpc + Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder() + .setDbId(-1) + .setTableId(-1) + .setPartitionId(-1) + .addAllDbIds(dbIds) + .addAllTableIds(tableIds) + .setBatchMode(true) + .setIsTableVersion(true) + .build(); + + try { + Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request); + if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + throw new RpcException("get table visible version", "unexpected status " + resp.getStatus()); + } + + List versions = resp.getVersionsList(); + if (versions.size() != tableIds.size()) { + throw new RpcException("get table visible version", + "wrong number of versions, required " + tableIds.size() + ", but got " + versions.size()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("get table version from meta service, tables: {}, versions: {}", tableIds, versions); + } + + for (int i = 0; i < versions.size(); i++) { + // Set visible version to 1 if no such table version exists. + if (versions.get(i) <= 0L) { + versions.set(i, 1L); + } + } + + return versions; + } catch (RpcException e) { + throw new RuntimeException("get table version from meta service failed", e); } } @@ -2921,19 +3003,6 @@ public MTMVSnapshotIf getTableSnapshot() { return new MTMVVersionSnapshot(visibleVersion); } - private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req) - throws RpcException { - long startAt = System.nanoTime(); - try { - return VersionHelper.getVisibleVersion(req); - } finally { - SummaryProfile profile = getSummaryProfile(); - if (profile != null) { - profile.addGetTableVersionTime(System.nanoTime() - startAt); - } - } - } - @Override public boolean needAutoRefresh() { return true; @@ -2944,17 +3013,6 @@ public boolean isPartitionColumnAllowNull() { return true; } - private static SummaryProfile getSummaryProfile() { - ConnectContext ctx = ConnectContext.get(); - if (ctx != null) { - StmtExecutor executor = ctx.getExecutor(); - if (executor != null) { - return executor.getSummaryProfile(); - } - } - return null; - } - public void setStatistics(Statistics statistics) { this.statistics = statistics; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java index 5036a0e01c4464..b2a9751394f2d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java @@ -125,7 +125,7 @@ public long getVisibleVersion() { .build(); try { - Cloud.GetVersionResponse resp = getVersionFromMeta(request); + Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request); long version = -1; if (resp.getStatus().getCode() == MetaServiceCode.OK) { version = resp.getVersion(); @@ -238,7 +238,7 @@ public static List getSnapshotVisibleVersion(List dbIds, List if (LOG.isDebugEnabled()) { LOG.debug("getVisibleVersion use CloudPartition {}", partitionIds.toString()); } - Cloud.GetVersionResponse resp = getVersionFromMeta(req); + Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(req); if (resp.getStatus().getCode() != MetaServiceCode.OK) { throw new RpcException("get visible version", "unexpected status " + resp.getStatus()); } @@ -339,19 +339,6 @@ public boolean hasData() { return getVisibleVersion() > Partition.PARTITION_INIT_VERSION; } - private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req) - throws RpcException { - long startAt = System.nanoTime(); - try { - return VersionHelper.getVisibleVersion(req); - } finally { - SummaryProfile profile = getSummaryProfile(); - if (profile != null) { - profile.addGetPartitionVersionTime(System.nanoTime() - startAt); - } - } - } - private static boolean isEmptyPartitionPruneDisabled() { ConnectContext ctx = ConnectContext.get(); if (ctx != null && (ctx.getSessionVariable().getDisableNereidsRules().get(RuleType.valueOf( diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/VersionHelper.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/VersionHelper.java index 1192d42af89202..703f8d2675cac6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/VersionHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/VersionHelper.java @@ -19,6 +19,9 @@ import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.common.Config; +import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; import org.apache.doris.rpc.RpcException; import org.apache.logging.log4j.LogManager; @@ -32,6 +35,26 @@ public class VersionHelper { private static final Logger LOG = LogManager.getLogger(VersionHelper.class); + // Call get_version() from meta service, and save the elapsed to summary profile. + public static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req) + throws RpcException { + long startAt = System.nanoTime(); + boolean isTableVersion = req.getIsTableVersion(); + try { + return getVisibleVersion(req); + } finally { + SummaryProfile profile = getSummaryProfile(); + if (profile != null) { + long elapsed = System.nanoTime() - startAt; + if (isTableVersion) { + profile.addGetTableVersionTime(elapsed); + } else { + profile.addGetPartitionVersionTime(elapsed); + } + } + } + } + public static Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest request) throws RpcException { int tryTimes = 0; while (tryTimes++ < Config.metaServiceRpcRetryTimes()) { @@ -65,8 +88,7 @@ public static Cloud.GetVersionResponse getVisibleVersionInternal(Cloud.GetVersio long deadline = System.currentTimeMillis() + timeoutMs; Cloud.GetVersionResponse resp = null; try { - Future future = - MetaServiceProxy.getInstance().getVisibleVersionAsync(request); + Future future = MetaServiceProxy.getInstance().getVisibleVersionAsync(request); while (resp == null) { try { @@ -89,4 +111,16 @@ private static void sleepSeveralMs(int lowerMs, int upperMs) { LOG.warn("get snapshot from meta service: sleep get interrupted exception"); } } + + private static SummaryProfile getSummaryProfile() { + ConnectContext ctx = ConnectContext.get(); + if (ctx != null) { + StmtExecutor executor = ctx.getExecutor(); + if (executor != null) { + return executor.getSummaryProfile(); + } + } + return null; + } + }