[feature-wip](parquet-vec) Support parquet scanner in vectorized engine (#9433)
This commit is contained in:
@ -186,6 +186,23 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
|
||||
public boolean apply(Expr arg) { return arg instanceof NullLiteral; }
|
||||
};
|
||||
|
||||
public static final com.google.common.base.Predicate<Expr> IS_VARCHAR_SLOT_REF_IMPLICIT_CAST =
|
||||
new com.google.common.base.Predicate<Expr>() {
|
||||
@Override
|
||||
public boolean apply(Expr arg) {
|
||||
// exclude explicit cast. for example: cast(k1 as date)
|
||||
if (!arg.isImplicitCast()) {
|
||||
return false;
|
||||
}
|
||||
List<Expr> children = arg.getChildren();
|
||||
if (children.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
Expr child = children.get(0);
|
||||
return child instanceof SlotRef && child.getType().isVarchar();
|
||||
}
|
||||
};
|
||||
|
||||
public void setSelectivity() {
|
||||
selectivity = -1;
|
||||
}
|
||||
|
||||
@ -105,6 +105,21 @@ public class TupleDescriptor {
|
||||
return slots;
|
||||
}
|
||||
|
||||
/**
|
||||
* get slot desc by slot id.
|
||||
*
|
||||
* @param slotId slot id
|
||||
* @return this slot's desc
|
||||
*/
|
||||
public SlotDescriptor getSlot(int slotId) {
|
||||
for (SlotDescriptor slotDesc : slots) {
|
||||
if (slotDesc.getId().asInt() == slotId) {
|
||||
return slotDesc;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setCardinality(long cardinality) {
|
||||
this.cardinality = cardinality;
|
||||
}
|
||||
|
||||
@ -192,6 +192,10 @@ public abstract class Type {
|
||||
|| isScalarType(PrimitiveType.STRING);
|
||||
}
|
||||
|
||||
public boolean isVarchar() {
|
||||
return isScalarType(PrimitiveType.VARCHAR);
|
||||
}
|
||||
|
||||
// only metric types have the following constraint:
|
||||
// 1. don't support as key column
|
||||
// 2. don't support filter
|
||||
|
||||
@ -95,6 +95,7 @@ import org.apache.doris.task.LoadTaskInfo;
|
||||
import org.apache.doris.task.PushTask;
|
||||
import org.apache.doris.thrift.TBrokerScanRangeParams;
|
||||
import org.apache.doris.thrift.TEtlState;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TMiniLoadRequest;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPriority;
|
||||
@ -930,7 +931,7 @@ public class Load {
|
||||
*/
|
||||
public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
|
||||
Map<String, Pair<String, List<String>>> columnToHadoopFunction) throws UserException {
|
||||
initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, false);
|
||||
initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, false, false);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -940,10 +941,11 @@ public class Load {
|
||||
public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs,
|
||||
Map<String, Pair<String, List<String>>> columnToHadoopFunction,
|
||||
Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
|
||||
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params) throws UserException {
|
||||
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params,
|
||||
TFileFormatType formatType, boolean useVectorizedLoad) throws UserException {
|
||||
rewriteColumns(columnDescs);
|
||||
initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer,
|
||||
srcTupleDesc, slotDescByName, params, true);
|
||||
srcTupleDesc, slotDescByName, params, formatType, useVectorizedLoad, true);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -958,6 +960,7 @@ public class Load {
|
||||
Map<String, Pair<String, List<String>>> columnToHadoopFunction,
|
||||
Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
|
||||
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params,
|
||||
TFileFormatType formatType, boolean useVectorizedLoad,
|
||||
boolean needInitSlotAndAnalyzeExprs) throws UserException {
|
||||
// We make a copy of the columnExprs so that our subsequent changes
|
||||
// to the columnExprs will not affect the original columnExprs.
|
||||
@ -1043,30 +1046,70 @@ public class Load {
|
||||
if (!needInitSlotAndAnalyzeExprs) {
|
||||
return;
|
||||
}
|
||||
|
||||
Set<String> exprSrcSlotName = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
|
||||
if (importColumnDesc.isColumn()) {
|
||||
continue;
|
||||
}
|
||||
List<SlotRef> slots = Lists.newArrayList();
|
||||
importColumnDesc.getExpr().collect(SlotRef.class, slots);
|
||||
for (SlotRef slot : slots) {
|
||||
String slotColumnName = slot.getColumnName();
|
||||
exprSrcSlotName.add(slotColumnName);
|
||||
}
|
||||
}
|
||||
// excludedColumns is columns that should be varchar type
|
||||
Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
// init slot desc add expr map, also transform hadoop functions
|
||||
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
|
||||
// make column name case match with real column name
|
||||
String columnName = importColumnDesc.getColumnName();
|
||||
Column tblColumn = tbl.getColumn(columnName);
|
||||
String realColName;
|
||||
if (tbl.getColumn(columnName) == null || importColumnDesc.getExpr() == null) {
|
||||
if (tblColumn == null || tblColumn.getName() == null || importColumnDesc.getExpr() == null) {
|
||||
realColName = columnName;
|
||||
} else {
|
||||
realColName = tbl.getColumn(columnName).getName();
|
||||
realColName = tblColumn.getName();
|
||||
}
|
||||
if (importColumnDesc.getExpr() != null) {
|
||||
Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
|
||||
exprsByName.put(realColName, expr);
|
||||
} else {
|
||||
SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
|
||||
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
|
||||
// only support parquet format now
|
||||
if (useVectorizedLoad && formatType == TFileFormatType.FORMAT_PARQUET
|
||||
&& tblColumn != null) {
|
||||
// in vectorized load
|
||||
// example: k1 is DATETIME in source file, and INT in schema, mapping exper is k1=year(k1)
|
||||
// we can not determine whether to use the type in the schema or the type inferred from expr
|
||||
// so use varchar type as before
|
||||
if (exprSrcSlotName.contains(columnName)) {
|
||||
// columns in expr args should be varchar type
|
||||
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
|
||||
slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
|
||||
excludedColumns.add(realColName);
|
||||
// example k1, k2 = k1 + 1, k1 is not nullable, k2 is nullable
|
||||
// so we can not determine columns in expr args whether not nullable or nullable
|
||||
// slot in expr args use nullable as before
|
||||
slotDesc.setIsNullable(true);
|
||||
} else {
|
||||
// columns from files like parquet files can be parsed as the type in table schema
|
||||
slotDesc.setType(tblColumn.getType());
|
||||
slotDesc.setColumn(new Column(realColName, tblColumn.getType()));
|
||||
// non-nullable column is allowed in vectorized load with parquet format
|
||||
slotDesc.setIsNullable(tblColumn.isAllowNull());
|
||||
}
|
||||
} else {
|
||||
// columns default be varchar type
|
||||
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
|
||||
slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
|
||||
// ISSUE A: src slot should be nullable even if the column is not nullable.
|
||||
// because src slot is what we read from file, not represent to real column value.
|
||||
// If column is not nullable, error will be thrown when filling the dest slot,
|
||||
// which is not nullable.
|
||||
slotDesc.setIsNullable(true);
|
||||
}
|
||||
slotDesc.setIsMaterialized(true);
|
||||
// ISSUE A: src slot should be nullable even if the column is not nullable.
|
||||
// because src slot is what we read from file, not represent to real column value.
|
||||
// If column is not nullable, error will be thrown when filling the dest slot,
|
||||
// which is not nullable.
|
||||
slotDesc.setIsNullable(true);
|
||||
slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
|
||||
params.addToSrcSlotIds(slotDesc.getId().asInt());
|
||||
slotDescByName.put(realColName, slotDesc);
|
||||
}
|
||||
@ -1085,7 +1128,30 @@ public class Load {
|
||||
}
|
||||
|
||||
LOG.debug("slotDescByName: {}, exprsByName: {}, mvDefineExpr: {}", slotDescByName, exprsByName, mvDefineExpr);
|
||||
// we only support parquet format now
|
||||
// use implicit deduction to convert columns
|
||||
// that are not in the doris table from varchar to a more appropriate type
|
||||
if (useVectorizedLoad && formatType == TFileFormatType.FORMAT_PARQUET) {
|
||||
// analyze all exprs
|
||||
Map<String, Expr> cloneExprsByName = Maps.newHashMap(exprsByName);
|
||||
Map<String, Expr> cloneMvDefineExpr = Maps.newHashMap(mvDefineExpr);
|
||||
analyzeAllExprs(tbl, analyzer, cloneExprsByName, cloneMvDefineExpr, slotDescByName, useVectorizedLoad);
|
||||
// columns that only exist in mapping expr args, replace type with inferred from exprs,
|
||||
// if there are more than one, choose the last except varchar type
|
||||
// for example:
|
||||
// k1 involves two mapping expr args: year(k1), t1=k1, k1's varchar type will be replaced by DATETIME
|
||||
replaceVarcharWithCastType(cloneExprsByName, srcTupleDesc, excludedColumns);
|
||||
}
|
||||
|
||||
// in vectorized load, reanalyze exprs with castExpr type
|
||||
// otherwise analyze exprs with varchar type
|
||||
analyzeAllExprs(tbl, analyzer, exprsByName, mvDefineExpr, slotDescByName, useVectorizedLoad);
|
||||
LOG.debug("after init column, exprMap: {}", exprsByName);
|
||||
}
|
||||
|
||||
private static void analyzeAllExprs(Table tbl, Analyzer analyzer, Map<String, Expr> exprsByName,
|
||||
Map<String, Expr> mvDefineExpr, Map<String, SlotDescriptor> slotDescByName,
|
||||
boolean useVectorizedLoad) throws UserException {
|
||||
// analyze all exprs
|
||||
for (Map.Entry<String, Expr> entry : exprsByName.entrySet()) {
|
||||
ExprSubstitutionMap smap = new ExprSubstitutionMap();
|
||||
@ -1094,14 +1160,17 @@ public class Load {
|
||||
for (SlotRef slot : slots) {
|
||||
SlotDescriptor slotDesc = slotDescByName.get(slot.getColumnName());
|
||||
if (slotDesc == null) {
|
||||
if (entry.getKey().equalsIgnoreCase(Column.DELETE_SIGN)) {
|
||||
throw new UserException("unknown reference column in DELETE ON clause:" + slot.getColumnName());
|
||||
} else if (entry.getKey().equalsIgnoreCase(Column.SEQUENCE_COL)) {
|
||||
throw new UserException("unknown reference column in ORDER BY clause:" + slot.getColumnName());
|
||||
} else {
|
||||
throw new UserException("unknown reference column, column=" + entry.getKey()
|
||||
+ ", reference=" + slot.getColumnName());
|
||||
if (entry.getKey() != null) {
|
||||
if (entry.getKey().equalsIgnoreCase(Column.DELETE_SIGN)) {
|
||||
throw new UserException("unknown reference column in DELETE ON clause:"
|
||||
+ slot.getColumnName());
|
||||
} else if (entry.getKey().equalsIgnoreCase(Column.SEQUENCE_COL)) {
|
||||
throw new UserException("unknown reference column in ORDER BY clause:"
|
||||
+ slot.getColumnName());
|
||||
}
|
||||
}
|
||||
throw new UserException("unknown reference column, column=" + entry.getKey()
|
||||
+ ", reference=" + slot.getColumnName());
|
||||
}
|
||||
smap.getLhs().add(slot);
|
||||
smap.getRhs().add(new SlotRef(slotDesc));
|
||||
@ -1149,7 +1218,50 @@ public class Load {
|
||||
|
||||
exprsByName.put(entry.getKey(), expr);
|
||||
}
|
||||
LOG.debug("after init column, exprMap: {}", exprsByName);
|
||||
}
|
||||
|
||||
/**
|
||||
* columns that only exist in mapping expr args, replace type with inferred from exprs.
|
||||
*
|
||||
* @param excludedColumns columns that the type should not be inferred from expr.
|
||||
* 1. column exists in both schema and expr args.
|
||||
*/
|
||||
private static void replaceVarcharWithCastType(Map<String, Expr> exprsByName, TupleDescriptor srcTupleDesc,
|
||||
Set<String> excludedColumns) throws UserException {
|
||||
// if there are more than one, choose the last except varchar type.
|
||||
// for example:
|
||||
// k1 involves two mapping expr args: year(k1), t1=k1, k1's varchar type will be replaced by DATETIME.
|
||||
for (Map.Entry<String, Expr> entry : exprsByName.entrySet()) {
|
||||
List<CastExpr> casts = Lists.newArrayList();
|
||||
// exclude explicit cast. for example: cast(k1 as date)
|
||||
entry.getValue().collect(Expr.IS_VARCHAR_SLOT_REF_IMPLICIT_CAST, casts);
|
||||
if (casts.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (CastExpr cast : casts) {
|
||||
Expr child = cast.getChild(0);
|
||||
Type type = cast.getType();
|
||||
if (type.isVarchar()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SlotRef slotRef = (SlotRef) child;
|
||||
String columnName = slotRef.getColumn().getName();
|
||||
if (excludedColumns.contains(columnName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// replace src slot desc with cast return type
|
||||
int slotId = slotRef.getSlotId().asInt();
|
||||
SlotDescriptor srcSlotDesc = srcTupleDesc.getSlot(slotId);
|
||||
if (srcSlotDesc == null) {
|
||||
throw new UserException("Unknown source slot descriptor. id: " + slotId);
|
||||
}
|
||||
srcSlotDesc.setType(type);
|
||||
srcSlotDesc.setColumn(new Column(columnName, type));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void rewriteColumns(LoadTaskInfo.ImportColumnDescs columnDescs) {
|
||||
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.Load;
|
||||
import org.apache.doris.load.loadv2.LoadTask;
|
||||
@ -268,7 +269,8 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
|
||||
Load.initColumns(targetTable, columnDescs,
|
||||
context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer,
|
||||
context.srcTupleDescriptor, context.slotDescByName, context.params);
|
||||
context.srcTupleDescriptor, context.slotDescByName, context.params,
|
||||
formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized());
|
||||
}
|
||||
|
||||
private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc)
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.load.Load;
|
||||
import org.apache.doris.load.loadv2.LoadTask;
|
||||
import org.apache.doris.task.LoadTaskInfo;
|
||||
@ -140,7 +141,8 @@ public class StreamLoadScanNode extends LoadScanNode {
|
||||
}
|
||||
|
||||
Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */,
|
||||
exprsByName, analyzer, srcTupleDesc, slotDescByName, params);
|
||||
exprsByName, analyzer, srcTupleDesc, slotDescByName, params,
|
||||
taskInfo.getFormatType(), VectorizedUtil.isVectorized());
|
||||
|
||||
// analyze where statement
|
||||
initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(), this.srcTupleDesc, analyzer);
|
||||
|
||||
@ -260,6 +260,8 @@ public class StreamLoadTask implements LoadTaskInfo {
|
||||
}
|
||||
switch (request.getFileType()) {
|
||||
case FILE_STREAM:
|
||||
// fall through to case FILE_LOCAL
|
||||
case FILE_LOCAL:
|
||||
path = request.getPath();
|
||||
break;
|
||||
default:
|
||||
|
||||
Reference in New Issue
Block a user