[refactor](multi catalog)Refactor FileQueryScanNode init and finalize mothods(#18954)

Refactor FileQueryScanNode init and finalize methods.
Handle schema related initialization in init method, handle scan range generation in finalize method.
This commit is contained in:
Jibing-Li
2023-04-25 11:18:21 +08:00
committed by GitHub
parent 228cc90e4e
commit a836a6a4fe
3 changed files with 117 additions and 174 deletions

View File

@ -47,7 +47,6 @@ import org.apache.doris.planner.external.LoadScanProvider;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TUniqueId;
@ -69,26 +68,14 @@ public class FileLoadScanNode extends FileScanNode {
public static class ParamCreateContext {
public BrokerFileGroup fileGroup;
public List<Expr> conjuncts;
public TupleDescriptor destTupleDescriptor;
public Map<String, SlotDescriptor> destSlotDescByName;
// === Set when init ===
public TupleDescriptor srcTupleDescriptor;
public Map<String, SlotDescriptor> srcSlotDescByName;
public Map<String, Expr> exprMap;
public String timezone;
// === Set when init ===
public TFileScanRangeParams params;
public void createDestSlotMap() {
Preconditions.checkNotNull(destTupleDescriptor);
destSlotDescByName = Maps.newHashMap();
for (SlotDescriptor slot : destTupleDescriptor.getSlots()) {
destSlotDescByName.put(slot.getColumn().getName(), slot);
}
}
}
// Save all info about load attributes and files.
@ -131,14 +118,11 @@ public class FileLoadScanNode extends FileScanNode {
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
for (FileGroupInfo fileGroupInfo : fileGroupInfos) {
this.scanProviders.add(new LoadScanProvider(fileGroupInfo, desc));
}
backendPolicy.init();
numNodes = backendPolicy.numBackends();
initParamCreateContexts(analyzer);
}
@ -146,12 +130,11 @@ public class FileLoadScanNode extends FileScanNode {
private void initParamCreateContexts(Analyzer analyzer) throws UserException {
for (FileScanProviderIf scanProvider : scanProviders) {
ParamCreateContext context = scanProvider.createContext(analyzer);
context.createDestSlotMap();
// set where and preceding filter.
// FIXME(cmy): we should support set different expr for different file group.
initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer);
initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer);
context.conjuncts = conjuncts;
setDefaultValueExprs(scanProvider, context.srcSlotDescByName, context.params, true);
this.contexts.add(context);
}
}
@ -214,7 +197,6 @@ public class FileLoadScanNode extends FileScanNode {
for (int i = 0; i < contexts.size(); ++i) {
FileLoadScanNode.ParamCreateContext context = contexts.get(i);
FileScanProviderIf scanProvider = scanProviders.get(i);
setDefaultValueExprs(scanProvider, context);
finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
@ -222,46 +204,6 @@ public class FileLoadScanNode extends FileScanNode {
}
}
protected void setDefaultValueExprs(FileScanProviderIf scanProvider, ParamCreateContext context)
throws UserException {
TableIf tbl = scanProvider.getTargetTable();
Preconditions.checkNotNull(tbl);
TExpr tExpr = new TExpr();
tExpr.setNodes(Lists.newArrayList());
for (Column column : tbl.getBaseSchema()) {
Expr expr;
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
expr = column.getDefaultValueExpr();
} else {
expr = new StringLiteral(column.getDefaultValue());
}
} else {
if (column.isAllowNull()) {
// In load process, the source type is string.
expr = NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR);
} else {
expr = null;
}
}
SlotDescriptor slotDesc = context.srcSlotDescByName.get(column.getName());
// if slot desc is null, which mean it is an unrelated slot, just skip.
// eg:
// (a, b, c) set (x=a, y=b, z=c)
// c does not exist in file, the z will be filled with null, even if z has default value.
// and if z is not nullable, the load will fail.
if (slotDesc != null) {
if (expr != null) {
expr = castToSlot(slotDesc, expr);
context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift());
} else {
context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr);
}
}
}
}
protected void finalizeParamsForLoad(ParamCreateContext context,
Analyzer analyzer) throws UserException {
Map<String, SlotDescriptor> slotDescByName = context.srcSlotDescByName;

View File

@ -18,11 +18,8 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionGenTable;
@ -40,7 +37,6 @@ import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
import org.apache.doris.planner.external.iceberg.IcebergSource;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
@ -80,43 +76,74 @@ public class FileQueryScanNode extends FileScanNode {
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
computeColumnFilter();
if (this.desc.getTable() instanceof HMSExternalTable) {
HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable();
initHMSExternalTable(hmsTable);
} else if (this.desc.getTable() instanceof FunctionGenTable) {
FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf());
} else if (this.desc.getTable() instanceof IcebergExternalTable) {
IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable();
initIcebergExternalTable(table);
}
backendPolicy.init();
numNodes = backendPolicy.numBackends();
initScanRangeParams();
doInitialize();
}
/**
* Init ExternalFileScanNode, ONLY used for Nereids. Should NOT use this function in anywhere else.
*/
public void init() throws UserException {
// prepare for partition prune
// computeColumnFilter();
doInitialize();
}
// Init scan provider and schema related params.
private void doInitialize() throws UserException {
Preconditions.checkNotNull(desc);
computeColumnFilter();
initScanProvider();
initBackendPolicy();
initSchemaParams();
}
private void initScanProvider() throws UserException {
if (this.desc.getTable() instanceof HMSExternalTable) {
HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable();
initHMSExternalTable(hmsTable);
initHMSTableScanProvider(hmsTable);
} else if (this.desc.getTable() instanceof FunctionGenTable) {
FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf());
initTVFScanProvider(table, (ExternalFileTableValuedFunction) table.getTvf());
} else if (this.desc.getTable() instanceof IcebergExternalTable) {
IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable();
initIcebergExternalTable(table);
initIcebergScanProvider(table);
}
}
// Init schema (Tuple/Slot) related params.
private void initSchemaParams() throws UserException {
destSlotDescByName = Maps.newHashMap();
for (SlotDescriptor slot : desc.getSlots()) {
destSlotDescByName.put(slot.getColumn().getName(), slot);
}
params = new TFileScanRangeParams();
params.setDestTupleId(desc.getId().asInt());
List<String> partitionKeys = scanProvider.getPathPartitionKeys();
List<Column> columns = desc.getTable().getBaseSchema(false);
params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
for (SlotDescriptor slot : desc.getSlots()) {
if (!slot.isMaterialized()) {
continue;
}
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
slotInfo.setSlotId(slot.getId().asInt());
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
params.addToRequiredSlots(slotInfo);
}
setDefaultValueExprs(scanProvider, destSlotDescByName, params, false);
setColumnPositionMappingForTextFile();
// For query, set src tuple id to -1.
params.setSrcTupleId(-1);
TableIf table = desc.getTable();
// Slot to schema id map is used for supporting hive 1.x orc internal column name (col0, col1, col2...)
if (table instanceof HMSExternalTable) {
if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
genSlotToSchemaIdMap();
}
}
}
private void initBackendPolicy() throws UserException {
backendPolicy.init();
numNodes = backendPolicy.numBackends();
initScanRangeParams();
}
/**
@ -139,7 +166,7 @@ public class FileQueryScanNode extends FileScanNode {
}
}
private void initHMSExternalTable(HMSExternalTable hmsTable) throws UserException {
private void initHMSTableScanProvider(HMSExternalTable hmsTable) throws UserException {
Preconditions.checkNotNull(hmsTable);
if (hmsTable.isView()) {
@ -174,7 +201,7 @@ public class FileQueryScanNode extends FileScanNode {
}
}
private void initIcebergExternalTable(IcebergExternalTable icebergTable) throws UserException {
private void initIcebergScanProvider(IcebergExternalTable icebergTable) throws UserException {
Preconditions.checkNotNull(icebergTable);
if (icebergTable.isView()) {
throw new AnalysisException(
@ -197,68 +224,26 @@ public class FileQueryScanNode extends FileScanNode {
}
}
private void initFunctionGenTable(FunctionGenTable table, ExternalFileTableValuedFunction tvf) {
private void initTVFScanProvider(FunctionGenTable table, ExternalFileTableValuedFunction tvf) {
Preconditions.checkNotNull(table);
scanProvider = new TVFScanProvider(table, desc, tvf);
}
// Create a corresponding TFileScanRangeParams
private void initScanRangeParams() throws UserException {
Preconditions.checkNotNull(desc);
destSlotDescByName = Maps.newHashMap();
for (SlotDescriptor slot : desc.getSlots()) {
destSlotDescByName.put(slot.getColumn().getName(), slot);
}
params = new TFileScanRangeParams();
params.setDestTupleId(desc.getId().asInt());
List<String> partitionKeys = scanProvider.getPathPartitionKeys();
List<Column> columns = desc.getTable().getBaseSchema(false);
params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
for (SlotDescriptor slot : desc.getSlots()) {
if (!slot.isMaterialized()) {
continue;
}
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
slotInfo.setSlotId(slot.getId().asInt());
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
params.addToRequiredSlots(slotInfo);
}
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
setDefaultValueExprs();
setColumnPositionMappingForTextFile();
params.setSrcTupleId(-1);
createScanRangeLocations(conjuncts, params, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
TableIf table = desc.getTable();
if (table instanceof HMSExternalTable) {
if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
genSlotToSchemaIdMap();
}
}
if (scanProvider instanceof HiveScanProvider) {
this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum();
this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum();
}
doFinalize();
}
@Override
public void finalizeForNereids() throws UserException {
setDefaultValueExprs();
setColumnPositionMappingForTextFile();
params.setSrcTupleId(-1);
doFinalize();
}
// Create scan range locations and the statistics.
private void doFinalize() throws UserException {
createScanRangeLocations(conjuncts, params, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
TableIf table = desc.getTable();
if (table instanceof HMSExternalTable) {
if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
genSlotToSchemaIdMap();
}
}
if (scanProvider instanceof HiveScanProvider) {
this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum();
this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum();
@ -285,45 +270,7 @@ public class FileQueryScanNode extends FileScanNode {
params.setColumnIdxs(columnIdxs);
}
protected void setDefaultValueExprs()
throws UserException {
TableIf tbl = scanProvider.getTargetTable();
Preconditions.checkNotNull(tbl);
TExpr tExpr = new TExpr();
tExpr.setNodes(Lists.newArrayList());
for (Column column : tbl.getBaseSchema()) {
Expr expr;
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
expr = column.getDefaultValueExpr();
} else {
expr = new StringLiteral(column.getDefaultValue());
}
} else {
if (column.isAllowNull()) {
expr = NullLiteral.create(column.getType());
} else {
expr = null;
}
}
SlotDescriptor slotDesc = destSlotDescByName.get(column.getName());
// if slot desc is null, which mean it is an unrelated slot, just skip.
// eg:
// (a, b, c) set (x=a, y=b, z=c)
// c does not exist in file, the z will be filled with null, even if z has default value.
// and if z is not nullable, the load will fail.
if (slotDesc != null) {
if (expr != null) {
expr = castToSlot(slotDesc, expr);
params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift());
} else {
params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr);
}
}
}
}
// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
private void genSlotToSchemaIdMap() {
List<Column> baseSchema = desc.getTable().getBaseSchema();
Map<String, Integer> columnNameToPosition = Maps.newHashMap();

View File

@ -18,12 +18,18 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanNode;
import org.apache.doris.thrift.TFileScanRangeParams;
@ -31,6 +37,7 @@ import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
@ -40,6 +47,7 @@ import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
* Base class for External File Scan, including external query and load.
@ -156,4 +164,50 @@ public class FileScanNode extends ExternalScanNode {
throws UserException {
scanProvider.createScanRangeLocations(conjuncts, params, backendPolicy, scanRangeLocations);
}
protected void setDefaultValueExprs(FileScanProviderIf scanProvider,
Map<String, SlotDescriptor> slotDescByName,
TFileScanRangeParams params,
boolean useVarcharAsNull) throws UserException {
TableIf tbl = scanProvider.getTargetTable();
Preconditions.checkNotNull(tbl);
TExpr tExpr = new TExpr();
tExpr.setNodes(Lists.newArrayList());
for (Column column : tbl.getBaseSchema()) {
Expr expr;
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
expr = column.getDefaultValueExpr();
} else {
expr = new StringLiteral(column.getDefaultValue());
}
} else {
if (column.isAllowNull()) {
// For load, use Varchar as Null, for query, use column type.
if (useVarcharAsNull) {
expr = NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR);
} else {
expr = NullLiteral.create(column.getType());
}
} else {
expr = null;
}
}
SlotDescriptor slotDesc = slotDescByName.get(column.getName());
// if slot desc is null, which mean it is an unrelated slot, just skip.
// eg:
// (a, b, c) set (x=a, y=b, z=c)
// c does not exist in file, the z will be filled with null, even if z has default value.
// and if z is not nullable, the load will fail.
if (slotDesc != null) {
if (expr != null) {
expr = castToSlot(slotDesc, expr);
params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift());
} else {
params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr);
}
}
}
}
}