[Refactor] simplify some code in routine load (#9532)
This commit is contained in:
@ -461,8 +461,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
format = ""; // if it's not json, then it's mean csv and set empty
|
||||
} else if (format.equalsIgnoreCase("json")) {
|
||||
format = "json";
|
||||
jsonPaths = jobProperties.get(JSONPATHS);
|
||||
jsonRoot = jobProperties.get(JSONROOT);
|
||||
jsonPaths = jobProperties.getOrDefault(JSONPATHS, "");
|
||||
jsonRoot = jobProperties.getOrDefault(JSONROOT, "");
|
||||
stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
|
||||
numAsString = Boolean.valueOf(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
|
||||
fuzzyParse = Boolean.valueOf(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
|
||||
|
||||
@ -299,42 +299,37 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
|
||||
if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) {
|
||||
jobProperties.put(PROPS_FORMAT, "csv");
|
||||
jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false");
|
||||
jobProperties.put(PROPS_NUM_AS_STRING, "false");
|
||||
jobProperties.put(PROPS_JSONPATHS, "");
|
||||
jobProperties.put(PROPS_JSONROOT, "");
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "false");
|
||||
} else if (stmt.getFormat().equals("json")) {
|
||||
jobProperties.put(PROPS_FORMAT, "json");
|
||||
if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) {
|
||||
jobProperties.put(PROPS_JSONPATHS, stmt.getJsonPaths());
|
||||
} else {
|
||||
jobProperties.put(PROPS_JSONPATHS, "");
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(stmt.getJsonRoot())) {
|
||||
jobProperties.put(PROPS_JSONROOT, stmt.getJsonRoot());
|
||||
} else {
|
||||
jobProperties.put(PROPS_JSONROOT, "");
|
||||
}
|
||||
if (stmt.isStripOuterArray()) {
|
||||
jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "true");
|
||||
} else {
|
||||
jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false");
|
||||
}
|
||||
if (stmt.isNumAsString()) {
|
||||
jobProperties.put(PROPS_NUM_AS_STRING, "true");
|
||||
} else {
|
||||
jobProperties.put(PROPS_NUM_AS_STRING, "false");
|
||||
}
|
||||
if (stmt.isFuzzyParse()) {
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "true");
|
||||
} else {
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "false");
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new UserException("Invalid format type.");
|
||||
}
|
||||
|
||||
if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) {
|
||||
jobProperties.put(PROPS_JSONPATHS, stmt.getJsonPaths());
|
||||
} else {
|
||||
jobProperties.put(PROPS_JSONPATHS, "");
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(stmt.getJsonRoot())) {
|
||||
jobProperties.put(PROPS_JSONROOT, stmt.getJsonRoot());
|
||||
} else {
|
||||
jobProperties.put(PROPS_JSONROOT, "");
|
||||
}
|
||||
if (stmt.isStripOuterArray()) {
|
||||
jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "true");
|
||||
} else {
|
||||
jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false");
|
||||
}
|
||||
if (stmt.isNumAsString()) {
|
||||
jobProperties.put(PROPS_NUM_AS_STRING, "true");
|
||||
} else {
|
||||
jobProperties.put(PROPS_NUM_AS_STRING, "false");
|
||||
}
|
||||
if (stmt.isFuzzyParse()) {
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "true");
|
||||
} else {
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "false");
|
||||
}
|
||||
}
|
||||
|
||||
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
|
||||
@ -342,10 +337,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
columnDescs = new ImportColumnDescs();
|
||||
if (routineLoadDesc.getColumnsInfo() != null) {
|
||||
ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo();
|
||||
if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) {
|
||||
for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) {
|
||||
columnDescs.descs.add(columnDesc);
|
||||
}
|
||||
if (columnsStmt.getColumns() != null) {
|
||||
columnDescs.descs.addAll(columnsStmt.getColumns());
|
||||
}
|
||||
}
|
||||
if (routineLoadDesc.getPrecedingFilter() != null) {
|
||||
@ -1316,7 +1309,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
|
||||
public List<List<String>> getTasksShowInfo() {
|
||||
List<List<String>> rows = Lists.newArrayList();
|
||||
routineLoadTaskInfoList.stream().forEach(entity -> {
|
||||
routineLoadTaskInfoList.forEach(entity -> {
|
||||
try {
|
||||
entity.setTxnStatus(Catalog.getCurrentCatalog().getGlobalTransactionMgr().getDatabaseTransactionMgr(dbId).getTransactionState(entity.getTxnId()).getTransactionStatus());
|
||||
rows.add(entity.getTaskShowInfo());
|
||||
@ -1481,10 +1474,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
return false;
|
||||
}
|
||||
Preconditions.checkState(endTimestamp != -1, endTimestamp);
|
||||
if ((System.currentTimeMillis() - endTimestamp) > Config.label_keep_max_second * 1000) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return (System.currentTimeMillis() - endTimestamp) > Config.label_keep_max_second * 1000;
|
||||
}
|
||||
|
||||
public boolean isFinal() {
|
||||
|
||||
Reference in New Issue
Block a user