Skip to content

Commit

Permalink
[MT][FEATURE] spark load in MT from 111-rc3 to 115:
Browse files Browse the repository at this point in the history
change list
1.add broker plus manifest.yaml
2.[MT] spark load for mt
3.[MT] Adapt MT internal spark/yarn commands and configurations
4.[MT] add custom properties for spark load etl & del tmp hive table
5.[MT] delete spark delete spark repository and archive & improve etl job log
6.[MT] feature(sparkload): support bitmap encode features in spark load
7.[MT] feature(sparkload parquet): disable parquet dictionary
8.feature(sparkload): support bitmap binary data from hive in spark load
9.[MT] feature(sparkload): add tolas-output dependency in SparkDpp
10.fix(spark load): resolve args conflict between skip_null_value  and map_side_join
   refactor(spark load): refactor function name

each commit detail are listed in this branch:
https://dev.sankuai.com/code/repo-detail/data/palo/commit/list?branch=sparkload-14-update-details
or in branh 13:
https://dev.sankuai.com/code/repo-detail/data/palo/commit/list?branch=refs%2Fheads%2F13

Some Spark Load changes in 0.15 to 1.1:
[MT][FIX][SPARKLOAD] fix bug when partition_id exceeds integer range in spark load (apache#9073)
[MT][FIX][SPARKLOAD] fix `getHashValue` of string type is always zero in spark load (apache#9135)
[MT][TMP][SPARKLOAD] support `custom.global.dict.table` in spark load
[MT][FEATURE][SPARKLOAD] support retry-strategy when get the spark elt job state timeout
[MT][SPARKLOAD] hive table name start with tmp key word and its size should be no longer than 128
[MT][TMP][SPARKLOAD] fix min_value will be negative number when `maxGlobalDictValue`  exceeds integer range (apache#9436)

detail commmits' content could be found in this branch:
https://dev.sankuai.com/code/repo-detail/data/palo/commit/list?branch=refs%2Fheads%2F14

[MT][TMP][FIX] fix UT in spark load
[MT][FEATURE] feature(spark-dpp version): add version file for spark-dpp
add spark-dpp commit id as version file when build FE

[MT][SPARKLOAD] some fixes from 15 to 1.1 by wangbo36
1 not connect hive metastore when create hive table
2 avoid cast from string to bitmap expr
3 cast bytebuffer to buffer
4 handle exception in ut
  • Loading branch information
weixiang authored and liutang123 committed Apr 15, 2024
1 parent 90f630e commit 09c3e15
Show file tree
Hide file tree
Showing 22 changed files with 536 additions and 213 deletions.
11 changes: 7 additions & 4 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ fi
DORIS_OUTPUT=${DORIS_OUTPUT:="${DORIS_HOME}/output/"}
echo "OUTPUT DIR=${DORIS_OUTPUT}"
mkdir -p "${DORIS_OUTPUT}"
COMMIT_ID=$(git rev-parse --short HEAD)

# Copy Frontend and Backend
if [[ "${BUILD_FE}" -eq 1 ]]; then
Expand All @@ -655,10 +656,12 @@ if [[ "${BUILD_FE}" -eq 1 ]]; then
mkdir -p "${DORIS_OUTPUT}/fe/conf/ssl"
fi

if [[ "${BUILD_SPARK_DPP}" -eq 1 ]]; then
install -d "${DORIS_OUTPUT}/fe/spark-dpp"
rm -rf "${DORIS_OUTPUT}/fe/spark-dpp"/*
cp -r -p "${DORIS_HOME}/fe/spark-dpp/target"/spark-dpp-*-jar-with-dependencies.jar "${DORIS_OUTPUT}/fe/spark-dpp"/

if [ ${BUILD_SPARK_DPP} -eq 1 ]; then
install -d ${DORIS_OUTPUT}/fe/spark-dpp/
rm -rf ${DORIS_OUTPUT}/fe/spark-dpp/*
echo ${COMMIT_ID} > ${DORIS_OUTPUT}/fe/spark-dpp/version.${COMMIT_ID}
cp -r -p ${DORIS_HOME}/fe/spark-dpp/target/spark-dpp-*-jar-with-dependencies.jar ${DORIS_OUTPUT}/fe/spark-dpp/
fi

if [[ "${OUTPUT_BE_BINARY}" -eq 1 ]]; then
Expand Down
19 changes: 17 additions & 2 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.common;

import java.lang.reflect.Field;

public class Config extends ConfigBase {

@ConfField(description = {"用户自定义配置文件的路径,用于存放 fe_custom.conf。该文件中的配置会覆盖 fe.conf 中的配置",
Expand Down Expand Up @@ -564,7 +566,10 @@ public class Config extends ConfigBase {

@ConfField(mutable = true, masterOnly = true, description = {"Spark Load 所使用的 Spark 程序目录",
"Spark dir for Spark Load"})
public static String spark_home_default_dir = System.getenv("DORIS_HOME") + "/lib/spark2x";
public static String spark_home_default_dir = "/opt/meituan/spark-2.2";

@ConfField(mutable = true, masterOnly = true)
public static String hdfs_prefix_mt = "hdfs://dfsrouter.vip.sankuai.com:8888";

@ConfField(description = {"Spark load 所使用的依赖项目录", "Spark dependencies dir for Spark Load"})
public static String spark_resource_path = "";
Expand All @@ -573,7 +578,7 @@ public class Config extends ConfigBase {
public static String spark_launcher_log_dir = sys_log_dir + "/spark_launcher_log";

@ConfField(description = {"Yarn client 的路径", "Yarn client path"})
public static String yarn_client_path = System.getenv("DORIS_HOME") + "/lib/yarn-client/hadoop/bin/yarn";
public static String yarn_client_path = "/opt/meituan/hadoop/bin/yarn";

@ConfField(description = {"Yarn 配置文件的路径", "Yarn config path"})
public static String yarn_config_dir = System.getenv("DORIS_HOME") + "/lib/yarn-config";
Expand Down Expand Up @@ -2530,6 +2535,16 @@ public class Config extends ConfigBase {
+ "and the deleted labels can be reused."
})
public static int label_num_threshold = 2000;
// 禁止在ADD PARTITION时,check quota,由于tablet非常多,check一次占用锁的时间非常长,因此需要禁用。
// 禁止在create table时check quota
@ConfField(mutable = true, masterOnly = true)
public static boolean mt_disable_check_quota_when_add_partition = true;

@ConfField(mutable = true)
public static long GET_APPID_TIMEOUT_MS = 300000L; //5min

@ConfField(mutable = true)
public static long EXEC_YARNCMD_TIMEOUT_MS = 30000L; // 30s

@ConfField(description = {"指定 internal catalog 的默认鉴权类",
"Specify the default authentication class of internal catalog"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
Expand Down Expand Up @@ -131,12 +132,12 @@
*/
public class EtlJobConfig implements Serializable {
// global dict
public static final String GLOBAL_DICT_TABLE_NAME = "doris_global_dict_table_%d";
public static final String DISTINCT_KEY_TABLE_NAME = "doris_distinct_key_table_%d_%s";
public static final String DORIS_INTERMEDIATE_HIVE_TABLE_NAME = "doris_intermediate_hive_table_%d_%s";
public static final String GLOBAL_DICT_TABLE_NAME = "doris_gdt_%s__%s";
public static final String DISTINCT_KEY_TABLE_NAME = "tmp_doris_dkt_%s__%s__%s";
public static final String DORIS_INTERMEDIATE_HIVE_TABLE_NAME = "tmp_doris_iht_%s__%s__%s";

// hdfsEtlPath/jobs/dbId/loadLabel/PendingTaskSignature
private static final String ETL_OUTPUT_PATH_FORMAT = "%s/jobs/%d/%s/%d";
private static final String ETL_OUTPUT_PATH_FORMAT = "%s/jobs/%s/%s/%d";
private static final String ETL_OUTPUT_FILE_NAME_DESC_V1
= "version.label.tableId.partitionId.indexId.bucket.schemaHash.parquet";
// tableId.partitionId.indexId.bucket.schemaHash
Expand All @@ -145,7 +146,10 @@ public class EtlJobConfig implements Serializable {

// dpp result
public static final String DPP_RESULT_NAME = "dpp_result.json";

@SerializedName(value = "dorisTableName")
public String dorisTableName;
@SerializedName(value = "dorisDBName")
public String dorisDBName;
@SerializedName(value = "tables")
public Map<Long, EtlTable> tables;
@SerializedName(value = "outputPath")
Expand All @@ -158,15 +162,18 @@ public class EtlJobConfig implements Serializable {
public EtlJobProperty properties;
@SerializedName(value = "configVersion")
public ConfigVersion configVersion;
@SerializedName(value="customizedProperties")
public Map<String, String> customizedProperties;

public EtlJobConfig(Map<Long, EtlTable> tables, String outputFilePattern, String label, EtlJobProperty properties) {
this.tables = tables;
// set outputPath when submit etl job
this.outputPath = null;
this.outputPath = "";
this.outputFilePattern = outputFilePattern;
this.label = label;
this.properties = properties;
this.configVersion = ConfigVersion.V1;
this.customizedProperties = Maps.newHashMap();
}

@Override
Expand All @@ -185,8 +192,8 @@ public String getOutputPath() {
return outputPath;
}

public static String getOutputPath(String hdfsEtlPath, long dbId, String loadLabel, long taskSignature) {
return String.format(ETL_OUTPUT_PATH_FORMAT, hdfsEtlPath, dbId, loadLabel, taskSignature);
public static String getOutputPath(String hdfsEtlPath, String dbName, String loadLabel, long taskSignature) {
return String.format(ETL_OUTPUT_PATH_FORMAT, hdfsEtlPath, dbName, loadLabel, taskSignature);
}

public static String getOutputFilePattern(String loadLabel, FilePatternVersion filePatternVersion) {
Expand Down
1 change: 1 addition & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.analysis.ModifyDistributionClause;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionRenameClause;
import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ public static void checkPartitionColumn(Column column) throws AnalysisException
}
}

public List<Map.Entry<Long, PartitionItem>> getSortedRangeMapAll() {
List<Map.Entry<Long, PartitionItem>> sortedListAll = Lists.newArrayList(idToItem.entrySet());
List<Map.Entry<Long, PartitionItem>> tmpPartitionList = Lists.newArrayList(idToTempItem.entrySet());
sortedListAll.addAll(tmpPartitionList);
Collections.sort(sortedListAll, RangeUtils.RANGE_MAP_ENTRY_COMPARATOR);
return sortedListAll;
}

@Override
public void checkPartitionItemListsMatch(List<PartitionItem> list1, List<PartitionItem> list2)
throws DdlException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class SparkResource extends Resource {
private static final String SPARK_CONFIG_PREFIX = "spark.";
private static final String BROKER_PROPERTY_PREFIX = "broker.";
private static final String ENV_PREFIX = "env.";
private static final String CUSTOM_PROPERTY_PREFIX = "custom.";
// spark uses hadoop configs in the form of spark.hadoop.*
private static final String SPARK_HADOOP_CONFIG_PREFIX = "spark.hadoop.";
private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address";
Expand All @@ -90,6 +91,9 @@ public class SparkResource extends Resource {
private static final String SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS = "spark.hadoop.yarn.resourcemanager.ha.rm-ids";
private static final String YARN_RESOURCE_MANAGER_ADDRESS_FOMART = "spark.hadoop.yarn.resourcemanager.address.%s";
private static final String YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT = "spark.hadoop.yarn.resourcemanager.hostname.%s";
private static final String HADOOP_PROXY_USER = "hadoop.proxy.user";
public static final String DPP_RESOURCE_DIR = "/spark-dpp/";
public static final String SPARK_DPP_JAR = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies.jar";

public enum DeployMode {
CLUSTER,
Expand All @@ -116,25 +120,32 @@ public static DeployMode fromString(String deployMode) {
private Map<String, String> brokerProperties;
@SerializedName(value = "envConfigs")
private Map<String, String> envConfigs;
@SerializedName(value="customizedProperties")
private Map<String, String> customizedProperties;

@SerializedName(value = "hadoopProxyUser")
private String hadoopProxyUser;


public SparkResource() {
super();
}

public SparkResource(String name) {
this(name, Maps.newHashMap(), null, null, Maps.newHashMap(), Maps.newHashMap());
this(name, Maps.newHashMap(), null, null,null, Maps.newHashMap(), Maps.newHashMap(), Maps.newHashMap());
}

// "public" for testing
public SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker,
Map<String, String> brokerProperties, Map<String, String> envConfigs) {
public SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker, String hadoopProxyUser,
Map<String, String> brokerProperties, Map<String,String> envConfigs, Map<String, String> customizedProperties) {
super(name, ResourceType.SPARK);
this.sparkConfigs = sparkConfigs;
this.workingDir = workingDir;
this.broker = broker;
this.hadoopProxyUser = hadoopProxyUser;
this.brokerProperties = brokerProperties;
this.envConfigs = envConfigs;
this.customizedProperties = customizedProperties;
}

public String getMaster() {
Expand All @@ -153,6 +164,10 @@ public String getBroker() {
return broker;
}

public String getHadoopProxyUser() { return hadoopProxyUser; }

public Map<String, String> getCustomizedProperties() { return customizedProperties; }

public Map<String, String> getBrokerPropertiesWithoutPrefix() {
Map<String, String> properties = Maps.newHashMap();
for (Map.Entry<String, String> entry : brokerProperties.entrySet()) {
Expand Down Expand Up @@ -190,7 +205,8 @@ public Pair<String, String> getYarnResourcemanagerAddressPair() {
}

public SparkResource getCopiedResource() {
return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties, envConfigs);
return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, hadoopProxyUser,
Maps.newHashMap(brokerProperties), Maps.newHashMap(envConfigs), Maps.newHashMap());
}

@Override
Expand Down Expand Up @@ -268,6 +284,9 @@ private void updateProperties(Map<String, String> properties) throws DdlExceptio
if (properties.containsKey(BROKER)) {
broker = properties.get(BROKER);
}
if (properties.containsKey(HADOOP_PROXY_USER)) {
hadoopProxyUser = properties.get(HADOOP_PROXY_USER);
}
brokerProperties.putAll(getBrokerProperties(properties));
Map<String, String> env = getEnvConfig(properties);
if (env.size() > 0) {
Expand All @@ -278,6 +297,7 @@ private void updateProperties(Map<String, String> properties) throws DdlExceptio
}
}
LOG.info("updateProperties,{},{}", properties, envConfigs);
customizedProperties.putAll(getCustomizedProperties(properties));
}

@Override
Expand Down Expand Up @@ -328,10 +348,10 @@ protected void setProperties(Map<String, String> properties) throws DdlException
+ "or not turned on ha.");
}
}

// check working dir and broker
workingDir = properties.get(WORKING_DIR);
broker = properties.get(BROKER);
hadoopProxyUser = properties.get(HADOOP_PROXY_USER);
if ((workingDir == null && broker != null) || (workingDir != null && broker == null)) {
throw new DdlException("working_dir and broker should be assigned at the same time");
}
Expand Down Expand Up @@ -382,6 +402,16 @@ private Map<String, String> getBrokerProperties(Map<String, String> properties)
return brokerProperties;
}

private Map<String, String> getCustomizedProperties(Map<String, String> properties) {
Map<String, String> customizedProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(CUSTOM_PROPERTY_PREFIX)) {
customizedProperties.put(entry.getKey(), entry.getValue());
}
}
return customizedProperties;
}

@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
updateProperties(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ public static CommandResult executeCommand(String cmd, String[] envp, long timeo
String[] cmds = cmdList.toArray(new String[0]);

try {
Process p = Runtime.getRuntime().exec(cmds, envp);
ProcessBuilder builder = new ProcessBuilder(cmds);
builder.redirectErrorStream(true);
Process p = builder.start();

CmdWorker cmdWorker = new CmdWorker(p);
cmdWorker.start();

Expand All @@ -195,7 +198,7 @@ public static CommandResult executeCommand(String cmd, String[] envp, long timeo
// if we get this far then we never got an exit value from the worker thread
// as a result of a timeout
LOG.warn("exec command [{}] timed out.", cmd);
exitValue = -1;
exitValue = -2;
}
} catch (InterruptedException ex) {
cmdWorker.interrupt();
Expand Down
Loading

0 comments on commit 09c3e15

Please sign in to comment.