Add fuzzy_parse option to speed up json import (#5114)
add a flag of fuzzy_parse, if the json file all object keys are the same and has same order, we only need to parse the first row, and then use index instead key to parse value
This commit is contained in:
@ -52,6 +52,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
|
||||
.add(CreateRoutineLoadStmt.JSONROOT)
|
||||
.add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)
|
||||
.add(CreateRoutineLoadStmt.NUM_AS_STRING)
|
||||
.add(CreateRoutineLoadStmt.FUZZY_PARSE)
|
||||
.add(LoadStmt.STRICT_MODE)
|
||||
.add(LoadStmt.TIMEZONE)
|
||||
.build();
|
||||
@ -188,6 +189,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
|
||||
analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.NUM_AS_STRING),
|
||||
String.valueOf(numAsString));
|
||||
}
|
||||
|
||||
if (jobProperties.containsKey(CreateRoutineLoadStmt.FUZZY_PARSE)) {
|
||||
boolean fuzzyParse = Boolean.valueOf(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE));
|
||||
analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE),
|
||||
String.valueOf(fuzzyParse));
|
||||
}
|
||||
}
|
||||
|
||||
private void checkDataSourceProperties() throws AnalysisException {
|
||||
|
||||
@ -102,6 +102,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
public static final String JSONPATHS = "jsonpaths";
|
||||
public static final String JSONROOT = "json_root";
|
||||
public static final String NUM_AS_STRING = "num_as_string";
|
||||
public static final String FUZZY_PARSE = "fuzzy_parse";
|
||||
|
||||
// kafka type properties
|
||||
public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list";
|
||||
@ -124,6 +125,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
.add(JSONPATHS)
|
||||
.add(STRIP_OUTER_ARRAY)
|
||||
.add(NUM_AS_STRING)
|
||||
.add(FUZZY_PARSE)
|
||||
.add(JSONROOT)
|
||||
.add(LoadStmt.STRICT_MODE)
|
||||
.add(LoadStmt.TIMEZONE)
|
||||
@ -168,6 +170,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
private String jsonRoot = ""; // MUST be a jsonpath string
|
||||
private boolean stripOuterArray = false;
|
||||
private boolean numAsString = false;
|
||||
private boolean fuzzyParse = false;
|
||||
|
||||
// kafka related properties
|
||||
private String kafkaBrokerList;
|
||||
@ -262,6 +265,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
return numAsString;
|
||||
}
|
||||
|
||||
public boolean isFuzzyParse() {
|
||||
return fuzzyParse;
|
||||
}
|
||||
|
||||
public String getJsonPaths() {
|
||||
return jsonPaths;
|
||||
}
|
||||
@ -439,6 +446,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
jsonRoot = jobProperties.get(JSONROOT);
|
||||
stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
|
||||
numAsString = Boolean.valueOf(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
|
||||
fuzzyParse = Boolean.valueOf(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
|
||||
} else {
|
||||
throw new UserException("Format type is invalid. format=`" + format + "`");
|
||||
}
|
||||
|
||||
@ -116,6 +116,7 @@ public class DataDescription {
|
||||
private boolean stripOuterArray = false;
|
||||
private String jsonPaths = "";
|
||||
private String jsonRoot = "";
|
||||
private boolean fuzzyParse = false;
|
||||
|
||||
private String sequenceCol;
|
||||
|
||||
@ -477,6 +478,14 @@ public class DataDescription {
|
||||
this.stripOuterArray = stripOuterArray;
|
||||
}
|
||||
|
||||
public boolean isFuzzyParse() {
|
||||
return fuzzyParse;
|
||||
}
|
||||
|
||||
public void setFuzzyParse(boolean fuzzyParse) {
|
||||
this.fuzzyParse = fuzzyParse;
|
||||
}
|
||||
|
||||
public String getJsonPaths() {
|
||||
return jsonPaths;
|
||||
}
|
||||
|
||||
@ -107,6 +107,7 @@ public class LoadStmt extends DdlStmt {
|
||||
public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths";
|
||||
public static final String KEY_IN_PARAM_JSONROOT = "json_root";
|
||||
public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = "strip_outer_array";
|
||||
public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse";
|
||||
public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type";
|
||||
public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete";
|
||||
public static final String KEY_IN_PARAM_FUNCTION_COLUMN = "function_column";
|
||||
|
||||
@ -271,6 +271,7 @@ public class UploadAction extends RestBaseController {
|
||||
public String stripOuterArray;
|
||||
public String jsonRoot;
|
||||
public String numAsString;
|
||||
public String fuzzyParse;
|
||||
|
||||
|
||||
public LoadContext(HttpServletRequest request, String db, String tbl, String user, String passwd, TmpFileMgr.TmpFile file) {
|
||||
@ -302,6 +303,7 @@ public class UploadAction extends RestBaseController {
|
||||
this.stripOuterArray = request.getHeader("strip_outer_array");
|
||||
this.numAsString = request.getHeader("num_as_string");
|
||||
this.jsonRoot = request.getHeader("json_root");
|
||||
this.fuzzyParse = request.getHeader("fuzzy_parse");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -99,6 +99,7 @@ public class BrokerFileGroup implements Writable {
|
||||
private boolean stripOuterArray = false;
|
||||
private String jsonPaths = "";
|
||||
private String jsonRoot = "";
|
||||
private boolean fuzzyParse = true;
|
||||
|
||||
// for unit test and edit log persistence
|
||||
private BrokerFileGroup() {
|
||||
@ -227,6 +228,7 @@ public class BrokerFileGroup implements Writable {
|
||||
stripOuterArray = dataDescription.isStripOuterArray();
|
||||
jsonPaths = dataDescription.getJsonPaths();
|
||||
jsonRoot = dataDescription.getJsonRoot();
|
||||
fuzzyParse = dataDescription.isFuzzyParse();
|
||||
}
|
||||
}
|
||||
|
||||
@ -326,6 +328,14 @@ public class BrokerFileGroup implements Writable {
|
||||
this.stripOuterArray = stripOuterArray;
|
||||
}
|
||||
|
||||
public boolean isFuzzyParse() {
|
||||
return fuzzyParse;
|
||||
}
|
||||
|
||||
public void setFuzzyParse(boolean fuzzyParse) {
|
||||
this.fuzzyParse = fuzzyParse;
|
||||
}
|
||||
|
||||
public String getJsonPaths() {
|
||||
return jsonPaths;
|
||||
}
|
||||
|
||||
@ -198,6 +198,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
private static final String PROPS_NUM_AS_STRING = "num_as_string";
|
||||
private static final String PROPS_JSONPATHS = "jsonpaths";
|
||||
private static final String PROPS_JSONROOT = "json_root";
|
||||
private static final String PROPS_FUZZY_PARSE = "fuzzy_parse";
|
||||
|
||||
protected int currentTaskConcurrentNum;
|
||||
protected RoutineLoadProgress progress;
|
||||
@ -308,6 +309,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
jobProperties.put(PROPS_NUM_AS_STRING, "false");
|
||||
jobProperties.put(PROPS_JSONPATHS, "");
|
||||
jobProperties.put(PROPS_JSONROOT, "");
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "false");
|
||||
} else if (stmt.getFormat().equals("json")) {
|
||||
jobProperties.put(PROPS_FORMAT, "json");
|
||||
if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) {
|
||||
@ -330,6 +332,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
} else {
|
||||
jobProperties.put(PROPS_NUM_AS_STRING, "false");
|
||||
}
|
||||
if (stmt.isFuzzyParse()) {
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "true");
|
||||
} else {
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "false");
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new UserException("Invalid format type.");
|
||||
@ -560,6 +567,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
return Boolean.valueOf(jobProperties.get(PROPS_NUM_AS_STRING));
|
||||
}
|
||||
|
||||
public boolean isFuzzyParse() {
|
||||
return Boolean.valueOf(jobProperties.get(PROPS_FUZZY_PARSE));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPath() {
|
||||
return null;
|
||||
|
||||
@ -427,6 +427,7 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
rangeDesc.setStripOuterArray(context.fileGroup.isStripOuterArray());
|
||||
rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths());
|
||||
rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot());
|
||||
rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse());
|
||||
}
|
||||
brokerScanRange(curLocations).addToRanges(rangeDesc);
|
||||
curFileOffset += rangeBytes;
|
||||
@ -451,6 +452,7 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
rangeDesc.setStripOuterArray(context.fileGroup.isStripOuterArray());
|
||||
rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths());
|
||||
rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot());
|
||||
rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse());
|
||||
}
|
||||
brokerScanRange(curLocations).addToRanges(rangeDesc);
|
||||
curFileOffset = 0;
|
||||
|
||||
@ -94,6 +94,7 @@ public class StreamLoadScanNode extends LoadScanNode {
|
||||
}
|
||||
rangeDesc.setStripOuterArray(taskInfo.isStripOuterArray());
|
||||
rangeDesc.setNumAsString(taskInfo.isNumAsString());
|
||||
rangeDesc.setFuzzyParse(taskInfo.isFuzzyParse());
|
||||
}
|
||||
rangeDesc.splittable = false;
|
||||
switch (taskInfo.getFileType()) {
|
||||
|
||||
@ -470,6 +470,7 @@ public class MultiLoadMgr {
|
||||
boolean stripOuterArray = false;
|
||||
String jsonPaths = "";
|
||||
String jsonRoot = "";
|
||||
boolean fuzzyParse = false;
|
||||
if (properties != null) {
|
||||
colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
|
||||
String columnSeparatorStr = properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR);
|
||||
@ -502,6 +503,8 @@ public class MultiLoadMgr {
|
||||
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY, "false"));
|
||||
jsonPaths = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONPATHS, "");
|
||||
jsonRoot = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONROOT, "");
|
||||
fuzzyParse = Boolean.valueOf(
|
||||
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false"));
|
||||
}
|
||||
}
|
||||
DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator,
|
||||
@ -518,6 +521,7 @@ public class MultiLoadMgr {
|
||||
dataDescription.setJsonPaths(jsonPaths);
|
||||
dataDescription.setJsonRoot(jsonRoot);
|
||||
dataDescription.setStripOuterArray(stripOuterArray);
|
||||
dataDescription.setFuzzyParse(fuzzyParse);
|
||||
return dataDescription;
|
||||
}
|
||||
|
||||
|
||||
@ -42,6 +42,7 @@ public interface LoadTaskInfo {
|
||||
public String getJsonPaths();
|
||||
public String getJsonRoot();
|
||||
public boolean isStripOuterArray();
|
||||
public boolean isFuzzyParse();
|
||||
public boolean isNumAsString();
|
||||
public String getPath();
|
||||
public List<ImportColumnDesc> getColumnExprDescs();
|
||||
|
||||
@ -60,6 +60,7 @@ public class StreamLoadTask implements LoadTaskInfo {
|
||||
private boolean numAsString;
|
||||
private String jsonPaths;
|
||||
private String jsonRoot;
|
||||
private boolean fuzzyParse;
|
||||
|
||||
// optional
|
||||
private List<ImportColumnDesc> columnExprDescs = Lists.newArrayList();
|
||||
@ -85,6 +86,7 @@ public class StreamLoadTask implements LoadTaskInfo {
|
||||
this.jsonRoot = "";
|
||||
this.stripOuterArray = false;
|
||||
this.numAsString = false;
|
||||
this.fuzzyParse = false;
|
||||
}
|
||||
|
||||
public TUniqueId getId() {
|
||||
@ -148,6 +150,15 @@ public class StreamLoadTask implements LoadTaskInfo {
|
||||
return numAsString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFuzzyParse() {
|
||||
return fuzzyParse;
|
||||
}
|
||||
|
||||
public void setFuzzyParse(boolean fuzzyParse) {
|
||||
this.fuzzyParse = fuzzyParse;
|
||||
}
|
||||
|
||||
public void setStripOuterArray(boolean stripOuterArray) {
|
||||
this.stripOuterArray = stripOuterArray;
|
||||
}
|
||||
@ -239,6 +250,7 @@ public class StreamLoadTask implements LoadTaskInfo {
|
||||
}
|
||||
stripOuterArray = request.isStripOuterArray();
|
||||
numAsString = request.isNumAsString();
|
||||
fuzzyParse = request.isFuzzyParse();
|
||||
}
|
||||
if (request.isSetMergeType()) {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user