@ -18,7 +18,6 @@ package com.baidu.palo.load;
|
||||
import com.baidu.palo.common.Config;
|
||||
import com.baidu.palo.common.FeConstants;
|
||||
import com.baidu.palo.common.LoadException;
|
||||
import com.baidu.palo.common.Pair;
|
||||
import com.baidu.palo.common.util.CommandResult;
|
||||
import com.baidu.palo.common.util.Util;
|
||||
import com.baidu.palo.thrift.TEtlState;
|
||||
@ -59,6 +58,7 @@ public class DppScheduler {
|
||||
private static final String JOB_CONFIG_DIR = PALO_HOME + "/temp/job_conf";
|
||||
private static final String JOB_CONFIG_FILE = "jobconfig.json";
|
||||
private static final String LOCAL_DPP_DIR = PALO_HOME + "/lib/dpp/" + FeConstants.dpp_version;
|
||||
private static final int DEFAULT_REDUCE_NUM = 1000;
|
||||
private static final long GB = 1024 * 1024 * 1024L;
|
||||
|
||||
// hdfs://host:port/outputPath/dbId/loadLabel/etlOutputDir
|
||||
@ -159,20 +159,24 @@ public class DppScheduler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Pair<String, Integer> inputPathAndReduceNum;
|
||||
try {
|
||||
inputPathAndReduceNum = getInputPathAndCalReduceNumBySize(jobConf);
|
||||
} catch (LoadException e) {
|
||||
failMsgs.add(e.getMessage());
|
||||
status.setStatus_code(TStatusCode.CANCELLED);
|
||||
return new EtlSubmitResult(status, null);
|
||||
}
|
||||
|
||||
|
||||
// create input path
|
||||
Set<String> inputPaths = getInputPaths(jobConf);
|
||||
String inputPath = StringUtils.join(inputPaths, " -input ");
|
||||
|
||||
// reduce num
|
||||
int reduceNumByInputSize = 0;
|
||||
try {
|
||||
reduceNumByInputSize = calcReduceNumByInputSize(inputPaths);
|
||||
} catch (InputSizeInvalidException e) {
|
||||
failMsgs.add(e.getMessage());
|
||||
status.setStatus_code(TStatusCode.CANCELLED);
|
||||
return new EtlSubmitResult(status, null);
|
||||
}
|
||||
int reduceNumByTablet = calcReduceNumByTablet(jobConf);
|
||||
int reduceNum = Math.min(inputPathAndReduceNum.second, reduceNumByTablet);
|
||||
int reduceNum = Math.min(reduceNumByInputSize, reduceNumByTablet);
|
||||
LOG.debug("calculate reduce num. reduceNum: {}, reduceNumByInputSize: {}, reduceNumByTablet: {}",
|
||||
reduceNum, inputPathAndReduceNum.second, reduceNumByTablet);
|
||||
reduceNum, reduceNumByInputSize, reduceNumByTablet);
|
||||
|
||||
// rm path
|
||||
String outputPath = (String) jobConf.get("output_path");
|
||||
@ -180,9 +184,9 @@ public class DppScheduler {
|
||||
|
||||
// submit etl job
|
||||
String etlJobName = String.format(ETL_JOB_NAME, dbName, loadLabel);
|
||||
String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName,
|
||||
inputPathAndReduceNum.first, outputPath, hadoopConfig, applicationsPath, applicationsPath,
|
||||
applicationsPath, reduceNum, configFile.getAbsolutePath());
|
||||
String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName, inputPath,
|
||||
outputPath, hadoopConfig, applicationsPath, applicationsPath, applicationsPath, reduceNum,
|
||||
configFile.getAbsolutePath());
|
||||
LOG.info(hadoopRunCmd);
|
||||
String outputLine = null;
|
||||
List<String> hadoopRunCmdList = Util.shellSplit(hadoopRunCmd);
|
||||
@ -325,62 +329,58 @@ public class DppScheduler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Pair<String, Integer> getInputPathAndCalReduceNumBySize(Map<String, Object> jobConf) throws LoadException {
|
||||
Map<String, Map> tables = (Map<String, Map>) jobConf.get("tables");
|
||||
Set<String> fileUrls = new HashSet<String>();
|
||||
for (Map<String, Map> table : tables.values()) {
|
||||
Map<String, Map> sourceFileSchema = (Map<String, Map>) table.get("source_file_schema");
|
||||
for (Map<String, List<String>> schema : sourceFileSchema.values()) {
|
||||
fileUrls.addAll(schema.get("file_urls"));
|
||||
}
|
||||
}
|
||||
|
||||
String fileUrl = StringUtils.join(fileUrls, " ");
|
||||
Set<String> inputPaths = new HashSet<String>();
|
||||
String hadoopLsCmd = String.format(HADOOP_LS_CMD, HADOOP_CLIENT, hadoopConfig, fileUrl);
|
||||
CommandResult lsResult = Util.executeCommand(hadoopLsCmd);
|
||||
if (lsResult.getReturnCode() != 0) {
|
||||
LOG.error("hadoopLsCmd: {}", hadoopLsCmd);
|
||||
throw new LoadException("get file list from hdfs failed");
|
||||
}
|
||||
|
||||
int reduceNum = 0;
|
||||
// calc total size
|
||||
long totalSizeB = 0L;
|
||||
String stdout = lsResult.getStdout();
|
||||
String[] lsFileResults = stdout.split("\n");
|
||||
for (String line : lsFileResults) {
|
||||
// drwxr-xr-x 3 palo palo 0 2014-12-08 14:37 /tmp/file
|
||||
String[] fileInfos = line.split(" +");
|
||||
if (fileInfos.length == 8) {
|
||||
String filePath = fileInfos[fileInfos.length - 1];
|
||||
if (inputPaths.add(filePath)) {
|
||||
totalSizeB += Long.parseLong(fileInfos[4]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check input size limit
|
||||
int inputSizeLimitGB = Config.load_input_size_limit_gb;
|
||||
if (inputSizeLimitGB != 0) {
|
||||
if (totalSizeB > inputSizeLimitGB * GB) {
|
||||
String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]"
|
||||
+ " exceeds system limit[" + inputSizeLimitGB + "GB]";
|
||||
LOG.warn(failMsg);
|
||||
throw new InputSizeInvalidException(failMsg);
|
||||
}
|
||||
}
|
||||
|
||||
if (totalSizeB != 0) {
|
||||
reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1;
|
||||
}
|
||||
|
||||
String inputPath = StringUtils.join(inputPaths, " -input ");
|
||||
Pair<String, Integer> inputPathAndReduceNum = new Pair<String, Integer>(inputPath, reduceNum);
|
||||
return inputPathAndReduceNum;
|
||||
}
|
||||
|
||||
|
||||
private Set<String> getInputPaths(Map<String, Object> jobConf) {
|
||||
Set<String> inputPaths = new HashSet<String>();
|
||||
Map<String, Map> tables = (Map<String, Map>) jobConf.get("tables");
|
||||
for (Map<String, Map> table : tables.values()) {
|
||||
Map<String, Map> sourceFileSchema = (Map<String, Map>) table.get("source_file_schema");
|
||||
for (Map<String, List<String>> schema : sourceFileSchema.values()) {
|
||||
List<String> fileUrls = schema.get("file_urls");
|
||||
inputPaths.addAll(fileUrls);
|
||||
}
|
||||
}
|
||||
return inputPaths;
|
||||
}
|
||||
|
||||
private int calcReduceNumByInputSize(Set<String> inputPaths) throws InputSizeInvalidException {
|
||||
int reduceNum = 0;
|
||||
String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig,
|
||||
StringUtils.join(inputPaths, " "));
|
||||
LOG.info(hadoopCountCmd);
|
||||
CommandResult result = Util.executeCommand(hadoopCountCmd);
|
||||
if (result.getReturnCode() != 0) {
|
||||
LOG.warn("hadoop count error, result: {}", result);
|
||||
return DEFAULT_REDUCE_NUM;
|
||||
}
|
||||
|
||||
// calc total size
|
||||
long totalSizeB = 0L;
|
||||
String[] fileInfos = result.getStdout().split("\n");
|
||||
for (String fileInfo : fileInfos) {
|
||||
String[] fileInfoArr = fileInfo.trim().split(" +");
|
||||
if (fileInfoArr.length == 4) {
|
||||
totalSizeB += Long.parseLong(fileInfoArr[2]);
|
||||
}
|
||||
}
|
||||
|
||||
// check input size limit
|
||||
int inputSizeLimitGB = Config.load_input_size_limit_gb;
|
||||
if (inputSizeLimitGB != 0) {
|
||||
if (totalSizeB > inputSizeLimitGB * GB) {
|
||||
String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]"
|
||||
+ " exceeds system limit[" + inputSizeLimitGB + "GB]";
|
||||
LOG.warn(failMsg);
|
||||
throw new InputSizeInvalidException(failMsg);
|
||||
}
|
||||
}
|
||||
|
||||
if (totalSizeB != 0) {
|
||||
reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1;
|
||||
}
|
||||
return reduceNum;
|
||||
}
|
||||
|
||||
private int calcReduceNumByTablet(Map<String, Object> jobConf) {
|
||||
int reduceNum = 0;
|
||||
Map<String, Map> tables = (Map<String, Map>) jobConf.get("tables");
|
||||
|
||||
Reference in New Issue
Block a user