Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert dpp schedular #38

Merged
merged 1 commit into from
Aug 21, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 72 additions & 72 deletions fe/src/com/baidu/palo/load/DppScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.baidu.palo.common.Config;
import com.baidu.palo.common.FeConstants;
import com.baidu.palo.common.LoadException;
import com.baidu.palo.common.Pair;
import com.baidu.palo.common.util.CommandResult;
import com.baidu.palo.common.util.Util;
import com.baidu.palo.thrift.TEtlState;
Expand Down Expand Up @@ -59,6 +58,7 @@ public class DppScheduler {
private static final String JOB_CONFIG_DIR = PALO_HOME + "/temp/job_conf";
private static final String JOB_CONFIG_FILE = "jobconfig.json";
private static final String LOCAL_DPP_DIR = PALO_HOME + "/lib/dpp/" + FeConstants.dpp_version;
private static final int DEFAULT_REDUCE_NUM = 1000;
private static final long GB = 1024 * 1024 * 1024L;

// hdfs://host:port/outputPath/dbId/loadLabel/etlOutputDir
Expand Down Expand Up @@ -159,30 +159,34 @@ public EtlSubmitResult submitEtlJob(long jobId, String loadLabel, String cluster
}
}
}

Pair<String, Integer> inputPathAndReduceNum;
try {
inputPathAndReduceNum = getInputPathAndCalReduceNumBySize(jobConf);
} catch (LoadException e) {
failMsgs.add(e.getMessage());
status.setStatus_code(TStatusCode.CANCELLED);
return new EtlSubmitResult(status, null);
}


// create input path
Set<String> inputPaths = getInputPaths(jobConf);
String inputPath = StringUtils.join(inputPaths, " -input ");

// reduce num
int reduceNumByInputSize = 0;
try {
reduceNumByInputSize = calcReduceNumByInputSize(inputPaths);
} catch (InputSizeInvalidException e) {
failMsgs.add(e.getMessage());
status.setStatus_code(TStatusCode.CANCELLED);
return new EtlSubmitResult(status, null);
}
int reduceNumByTablet = calcReduceNumByTablet(jobConf);
int reduceNum = Math.min(inputPathAndReduceNum.second, reduceNumByTablet);
int reduceNum = Math.min(reduceNumByInputSize, reduceNumByTablet);
LOG.debug("calculate reduce num. reduceNum: {}, reduceNumByInputSize: {}, reduceNumByTablet: {}",
reduceNum, inputPathAndReduceNum.second, reduceNumByTablet);
reduceNum, reduceNumByInputSize, reduceNumByTablet);

// rm path
String outputPath = (String) jobConf.get("output_path");
deleteEtlOutputPath(outputPath);

// submit etl job
String etlJobName = String.format(ETL_JOB_NAME, dbName, loadLabel);
String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName,
inputPathAndReduceNum.first, outputPath, hadoopConfig, applicationsPath, applicationsPath,
applicationsPath, reduceNum, configFile.getAbsolutePath());
String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName, inputPath,
outputPath, hadoopConfig, applicationsPath, applicationsPath, applicationsPath, reduceNum,
configFile.getAbsolutePath());
LOG.info(hadoopRunCmd);
String outputLine = null;
List<String> hadoopRunCmdList = Util.shellSplit(hadoopRunCmd);
Expand Down Expand Up @@ -325,62 +329,58 @@ private void prepareDppApplications() throws LoadException {
}
}
}

private Pair<String, Integer> getInputPathAndCalReduceNumBySize(Map<String, Object> jobConf) throws LoadException {
Map<String, Map> tables = (Map<String, Map>) jobConf.get("tables");
Set<String> fileUrls = new HashSet<String>();
for (Map<String, Map> table : tables.values()) {
Map<String, Map> sourceFileSchema = (Map<String, Map>) table.get("source_file_schema");
for (Map<String, List<String>> schema : sourceFileSchema.values()) {
fileUrls.addAll(schema.get("file_urls"));
}
}

String fileUrl = StringUtils.join(fileUrls, " ");
Set<String> inputPaths = new HashSet<String>();
String hadoopLsCmd = String.format(HADOOP_LS_CMD, HADOOP_CLIENT, hadoopConfig, fileUrl);
CommandResult lsResult = Util.executeCommand(hadoopLsCmd);
if (lsResult.getReturnCode() != 0) {
LOG.error("hadoopLsCmd: {}", hadoopLsCmd);
throw new LoadException("get file list from hdfs failed");
}

int reduceNum = 0;
// calc total size
long totalSizeB = 0L;
String stdout = lsResult.getStdout();
String[] lsFileResults = stdout.split("\n");
for (String line : lsFileResults) {
// drwxr-xr-x 3 palo palo 0 2014-12-08 14:37 /tmp/file
String[] fileInfos = line.split(" +");
if (fileInfos.length == 8) {
String filePath = fileInfos[fileInfos.length - 1];
if (inputPaths.add(filePath)) {
totalSizeB += Long.parseLong(fileInfos[4]);
}
}
}

// check input size limit
int inputSizeLimitGB = Config.load_input_size_limit_gb;
if (inputSizeLimitGB != 0) {
if (totalSizeB > inputSizeLimitGB * GB) {
String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]"
+ " exceeds system limit[" + inputSizeLimitGB + "GB]";
LOG.warn(failMsg);
throw new InputSizeInvalidException(failMsg);
}
}

if (totalSizeB != 0) {
reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1;
}

String inputPath = StringUtils.join(inputPaths, " -input ");
Pair<String, Integer> inputPathAndReduceNum = new Pair<String, Integer>(inputPath, reduceNum);
return inputPathAndReduceNum;
}


private Set<String> getInputPaths(Map<String, Object> jobConf) {
Set<String> inputPaths = new HashSet<String>();
Map<String, Map> tables = (Map<String, Map>) jobConf.get("tables");
for (Map<String, Map> table : tables.values()) {
Map<String, Map> sourceFileSchema = (Map<String, Map>) table.get("source_file_schema");
for (Map<String, List<String>> schema : sourceFileSchema.values()) {
List<String> fileUrls = schema.get("file_urls");
inputPaths.addAll(fileUrls);
}
}
return inputPaths;
}

private int calcReduceNumByInputSize(Set<String> inputPaths) throws InputSizeInvalidException {
int reduceNum = 0;
String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig,
StringUtils.join(inputPaths, " "));
LOG.info(hadoopCountCmd);
CommandResult result = Util.executeCommand(hadoopCountCmd);
if (result.getReturnCode() != 0) {
LOG.warn("hadoop count error, result: {}", result);
return DEFAULT_REDUCE_NUM;
}

// calc total size
long totalSizeB = 0L;
String[] fileInfos = result.getStdout().split("\n");
for (String fileInfo : fileInfos) {
String[] fileInfoArr = fileInfo.trim().split(" +");
if (fileInfoArr.length == 4) {
totalSizeB += Long.parseLong(fileInfoArr[2]);
}
}

// check input size limit
int inputSizeLimitGB = Config.load_input_size_limit_gb;
if (inputSizeLimitGB != 0) {
if (totalSizeB > inputSizeLimitGB * GB) {
String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]"
+ " exceeds system limit[" + inputSizeLimitGB + "GB]";
LOG.warn(failMsg);
throw new InputSizeInvalidException(failMsg);
}
}

if (totalSizeB != 0) {
reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1;
}
return reduceNum;
}

private int calcReduceNumByTablet(Map<String, Object> jobConf) {
int reduceNum = 0;
Map<String, Map> tables = (Map<String, Map>) jobConf.get("tables");
Expand Down