[enhancement](load) support for broker load, routine load, mysql load and add docs (#25528)

cases will be added later.
This commit is contained in:
Siyang Tang
2023-10-19 15:43:22 +08:00
committed by GitHub
parent 159be51ea6
commit 2353582493
15 changed files with 120 additions and 2 deletions

View File

@ -175,6 +175,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private boolean numAsString = false;
private boolean fuzzyParse = false;
private String enclose;
private String escape;
/**
* support partial columns load(Only Unique Key Columns)
*/
@ -301,6 +305,14 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return jsonPaths;
}
public String getEnclose() {
return enclose;
}
public String getEscape() {
return escape;
}
public String getJsonRoot() {
return jsonRoot;
}
@ -484,6 +496,14 @@ public class CreateRoutineLoadStmt extends DdlStmt {
loadToSingleTablet = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET),
RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
enclose = jobProperties.get(LoadStmt.KEY_ENCLOSE);
if (enclose != null && enclose.length() != 1) {
throw new AnalysisException("enclose must be single-char");
}
escape = jobProperties.get(LoadStmt.KEY_ESCAPE);
if (escape != null && escape.length() != 1) {
throw new AnalysisException("escape must be single-char");
}
if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();

View File

@ -51,6 +51,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -865,7 +866,7 @@ public class DataDescription implements InsertStmt.DataDesc {
return;
}
String columnsSQL = "COLUMNS (" + columnDef + ")";
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL)));
SqlParser parser = new SqlParser(new org.apache.doris.analysis.SqlScanner(new StringReader(columnsSQL)));
ImportColumnsStmt columnsStmt;
try {
columnsStmt = (ImportColumnsStmt) SqlParserUtils.getFirstStmt(parser);
@ -1001,6 +1002,22 @@ public class DataDescription implements InsertStmt.DataDesc {
if (analysisMap.containsKey(LoadStmt.KEY_SKIP_LINES)) {
skipLines = Integer.parseInt(analysisMap.get(LoadStmt.KEY_SKIP_LINES));
}
if (analysisMap.containsKey(LoadStmt.KEY_ENCLOSE)) {
String encloseProp = analysisMap.get(LoadStmt.KEY_ENCLOSE);
if (encloseProp.length() == 1) {
enclose = encloseProp.getBytes(StandardCharsets.UTF_8)[0];
} else {
throw new AnalysisException("enclose must be single-char");
}
}
if (analysisMap.containsKey(LoadStmt.KEY_ESCAPE)) {
String escapeProp = analysisMap.get(LoadStmt.KEY_ESCAPE);
if (escapeProp.length() == 1) {
escape = escapeProp.getBytes(StandardCharsets.UTF_8)[0];
} else {
throw new AnalysisException("escape must be single-char");
}
}
}
private void checkLoadPriv(String fullDbName) throws AnalysisException {

View File

@ -125,6 +125,10 @@ public class LoadStmt extends DdlStmt {
public static final String KEY_COMMENT = "comment";
public static final String KEY_ENCLOSE = "enclose";
public static final String KEY_ESCAPE = "escape";
private final LabelName label;
private final List<DataDescription> dataDescriptions;
private final BrokerDesc brokerDesc;

View File

@ -372,6 +372,18 @@ public class MysqlLoadManager {
String trimQuotes = props.get(LoadStmt.KEY_TRIM_DOUBLE_QUOTES);
httpPut.addHeader(LoadStmt.KEY_TRIM_DOUBLE_QUOTES, trimQuotes);
}
// enclose
if (props.containsKey(LoadStmt.KEY_ENCLOSE)) {
String enclose = props.get(LoadStmt.KEY_ENCLOSE);
httpPut.addHeader(LoadStmt.KEY_ENCLOSE, enclose);
}
//escape
if (props.containsKey(LoadStmt.KEY_ESCAPE)) {
String escape = props.get(LoadStmt.KEY_ESCAPE);
httpPut.addHeader(LoadStmt.KEY_ESCAPE, escape);
}
}
// skip_lines

View File

@ -351,7 +351,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
this.isPartialUpdate = true;
}
jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio));
if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) {
jobProperties.put(PROPS_FORMAT, "csv");
} else if (stmt.getFormat().equals("json")) {
@ -385,6 +384,12 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
} else {
jobProperties.put(PROPS_FUZZY_PARSE, "false");
}
if (stmt.getEnclose() != null) {
jobProperties.put(LoadStmt.KEY_ENCLOSE, stmt.getEnclose());
}
if (stmt.getEscape() != null) {
jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape());
}
}
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {