From a836a6a4feb34bfaca2354f9e797f65a5cfb81e8 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Tue, 25 Apr 2023 11:18:21 +0800 Subject: [PATCH] [refactor](multi catalog)Refactor FileQueryScanNode init and finalize mothods(#18954) Refactor FileQueryScanNode init and finalize methods. Handle schema related initialization in init method, handle scan range generation in finalize method. --- .../doris/planner/FileLoadScanNode.java | 60 +----- .../planner/external/FileQueryScanNode.java | 177 ++++++------------ .../doris/planner/external/FileScanNode.java | 54 ++++++ 3 files changed, 117 insertions(+), 174 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java index 4b8768cc8b..c3639f34e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java @@ -47,7 +47,6 @@ import org.apache.doris.planner.external.LoadScanProvider; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TBrokerFileStatus; -import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TUniqueId; @@ -69,26 +68,14 @@ public class FileLoadScanNode extends FileScanNode { public static class ParamCreateContext { public BrokerFileGroup fileGroup; - public List conjuncts; - public TupleDescriptor destTupleDescriptor; - public Map destSlotDescByName; // === Set when init === public TupleDescriptor srcTupleDescriptor; public Map srcSlotDescByName; public Map exprMap; public String timezone; // === Set when init === - public TFileScanRangeParams params; - - public void createDestSlotMap() { - Preconditions.checkNotNull(destTupleDescriptor); - destSlotDescByName = Maps.newHashMap(); - for (SlotDescriptor slot : destTupleDescriptor.getSlots()) { - destSlotDescByName.put(slot.getColumn().getName(), slot); - } - } } // Save all info about load attributes and files. @@ -131,14 +118,11 @@ public class FileLoadScanNode extends FileScanNode { @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); - for (FileGroupInfo fileGroupInfo : fileGroupInfos) { this.scanProviders.add(new LoadScanProvider(fileGroupInfo, desc)); } - backendPolicy.init(); numNodes = backendPolicy.numBackends(); - initParamCreateContexts(analyzer); } @@ -146,12 +130,11 @@ public class FileLoadScanNode extends FileScanNode { private void initParamCreateContexts(Analyzer analyzer) throws UserException { for (FileScanProviderIf scanProvider : scanProviders) { ParamCreateContext context = scanProvider.createContext(analyzer); - context.createDestSlotMap(); // set where and preceding filter. // FIXME(cmy): we should support set different expr for different file group. initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer); initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer); - context.conjuncts = conjuncts; + setDefaultValueExprs(scanProvider, context.srcSlotDescByName, context.params, true); this.contexts.add(context); } } @@ -214,7 +197,6 @@ public class FileLoadScanNode extends FileScanNode { for (int i = 0; i < contexts.size(); ++i) { FileLoadScanNode.ParamCreateContext context = contexts.get(i); FileScanProviderIf scanProvider = scanProviders.get(i); - setDefaultValueExprs(scanProvider, context); finalizeParamsForLoad(context, analyzer); createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); @@ -222,46 +204,6 @@ public class FileLoadScanNode extends FileScanNode { } } - protected void setDefaultValueExprs(FileScanProviderIf scanProvider, ParamCreateContext context) - throws UserException { - TableIf tbl = scanProvider.getTargetTable(); - Preconditions.checkNotNull(tbl); - TExpr tExpr = new TExpr(); - tExpr.setNodes(Lists.newArrayList()); - - for (Column column : tbl.getBaseSchema()) { - Expr expr; - if (column.getDefaultValue() != null) { - if (column.getDefaultValueExprDef() != null) { - expr = column.getDefaultValueExpr(); - } else { - expr = new StringLiteral(column.getDefaultValue()); - } - } else { - if (column.isAllowNull()) { - // In load process, the source type is string. - expr = NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR); - } else { - expr = null; - } - } - SlotDescriptor slotDesc = context.srcSlotDescByName.get(column.getName()); - // if slot desc is null, which mean it is an unrelated slot, just skip. - // eg: - // (a, b, c) set (x=a, y=b, z=c) - // c does not exist in file, the z will be filled with null, even if z has default value. - // and if z is not nullable, the load will fail. - if (slotDesc != null) { - if (expr != null) { - expr = castToSlot(slotDesc, expr); - context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift()); - } else { - context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr); - } - } - } - } - protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer analyzer) throws UserException { Map slotDescByName = context.srcSlotDescByName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index aa04d54f3c..48f35eb363 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -18,11 +18,8 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; -import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FunctionGenTable; @@ -40,7 +37,6 @@ import org.apache.doris.planner.external.iceberg.IcebergScanProvider; import org.apache.doris.planner.external.iceberg.IcebergSource; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; -import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; @@ -80,43 +76,74 @@ public class FileQueryScanNode extends FileScanNode { @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); - - computeColumnFilter(); - if (this.desc.getTable() instanceof HMSExternalTable) { - HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); - initHMSExternalTable(hmsTable); - } else if (this.desc.getTable() instanceof FunctionGenTable) { - FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); - initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); - } else if (this.desc.getTable() instanceof IcebergExternalTable) { - IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); - initIcebergExternalTable(table); - } - backendPolicy.init(); - numNodes = backendPolicy.numBackends(); - initScanRangeParams(); + doInitialize(); } /** * Init ExternalFileScanNode, ONLY used for Nereids. Should NOT use this function in anywhere else. */ public void init() throws UserException { - // prepare for partition prune - // computeColumnFilter(); + doInitialize(); + } + + // Init scan provider and schema related params. + private void doInitialize() throws UserException { + Preconditions.checkNotNull(desc); + computeColumnFilter(); + initScanProvider(); + initBackendPolicy(); + initSchemaParams(); + } + + private void initScanProvider() throws UserException { if (this.desc.getTable() instanceof HMSExternalTable) { HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); - initHMSExternalTable(hmsTable); + initHMSTableScanProvider(hmsTable); } else if (this.desc.getTable() instanceof FunctionGenTable) { FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); - initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); + initTVFScanProvider(table, (ExternalFileTableValuedFunction) table.getTvf()); } else if (this.desc.getTable() instanceof IcebergExternalTable) { IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); - initIcebergExternalTable(table); + initIcebergScanProvider(table); } + } + // Init schema (Tuple/Slot) related params. + private void initSchemaParams() throws UserException { + destSlotDescByName = Maps.newHashMap(); + for (SlotDescriptor slot : desc.getSlots()) { + destSlotDescByName.put(slot.getColumn().getName(), slot); + } + params = new TFileScanRangeParams(); + params.setDestTupleId(desc.getId().asInt()); + List partitionKeys = scanProvider.getPathPartitionKeys(); + List columns = desc.getTable().getBaseSchema(false); + params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(slot.getId().asInt()); + slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); + params.addToRequiredSlots(slotInfo); + } + setDefaultValueExprs(scanProvider, destSlotDescByName, params, false); + setColumnPositionMappingForTextFile(); + // For query, set src tuple id to -1. + params.setSrcTupleId(-1); + TableIf table = desc.getTable(); + // Slot to schema id map is used for supporting hive 1.x orc internal column name (col0, col1, col2...) + if (table instanceof HMSExternalTable) { + if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + genSlotToSchemaIdMap(); + } + } + } + + private void initBackendPolicy() throws UserException { backendPolicy.init(); numNodes = backendPolicy.numBackends(); - initScanRangeParams(); } /** @@ -139,7 +166,7 @@ public class FileQueryScanNode extends FileScanNode { } } - private void initHMSExternalTable(HMSExternalTable hmsTable) throws UserException { + private void initHMSTableScanProvider(HMSExternalTable hmsTable) throws UserException { Preconditions.checkNotNull(hmsTable); if (hmsTable.isView()) { @@ -174,7 +201,7 @@ public class FileQueryScanNode extends FileScanNode { } } - private void initIcebergExternalTable(IcebergExternalTable icebergTable) throws UserException { + private void initIcebergScanProvider(IcebergExternalTable icebergTable) throws UserException { Preconditions.checkNotNull(icebergTable); if (icebergTable.isView()) { throw new AnalysisException( @@ -197,68 +224,26 @@ public class FileQueryScanNode extends FileScanNode { } } - private void initFunctionGenTable(FunctionGenTable table, ExternalFileTableValuedFunction tvf) { + private void initTVFScanProvider(FunctionGenTable table, ExternalFileTableValuedFunction tvf) { Preconditions.checkNotNull(table); scanProvider = new TVFScanProvider(table, desc, tvf); } - // Create a corresponding TFileScanRangeParams - private void initScanRangeParams() throws UserException { - Preconditions.checkNotNull(desc); - destSlotDescByName = Maps.newHashMap(); - for (SlotDescriptor slot : desc.getSlots()) { - destSlotDescByName.put(slot.getColumn().getName(), slot); - } - params = new TFileScanRangeParams(); - params.setDestTupleId(desc.getId().asInt()); - List partitionKeys = scanProvider.getPathPartitionKeys(); - List columns = desc.getTable().getBaseSchema(false); - params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); - for (SlotDescriptor slot : desc.getSlots()) { - if (!slot.isMaterialized()) { - continue; - } - TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); - slotInfo.setSlotId(slot.getId().asInt()); - slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); - params.addToRequiredSlots(slotInfo); - } - } - @Override public void finalize(Analyzer analyzer) throws UserException { - setDefaultValueExprs(); - setColumnPositionMappingForTextFile(); - params.setSrcTupleId(-1); - createScanRangeLocations(conjuncts, params, scanProvider); - this.inputSplitsNum += scanProvider.getInputSplitNum(); - this.totalFileSize += scanProvider.getInputFileSize(); - TableIf table = desc.getTable(); - if (table instanceof HMSExternalTable) { - if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { - genSlotToSchemaIdMap(); - } - } - if (scanProvider instanceof HiveScanProvider) { - this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); - this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); - } + doFinalize(); } @Override public void finalizeForNereids() throws UserException { - setDefaultValueExprs(); - setColumnPositionMappingForTextFile(); - params.setSrcTupleId(-1); + doFinalize(); + } + + // Create scan range locations and the statistics. + private void doFinalize() throws UserException { createScanRangeLocations(conjuncts, params, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); this.totalFileSize += scanProvider.getInputFileSize(); - TableIf table = desc.getTable(); - if (table instanceof HMSExternalTable) { - if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { - genSlotToSchemaIdMap(); - } - } if (scanProvider instanceof HiveScanProvider) { this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); @@ -285,45 +270,7 @@ public class FileQueryScanNode extends FileScanNode { params.setColumnIdxs(columnIdxs); } - protected void setDefaultValueExprs() - throws UserException { - TableIf tbl = scanProvider.getTargetTable(); - Preconditions.checkNotNull(tbl); - TExpr tExpr = new TExpr(); - tExpr.setNodes(Lists.newArrayList()); - - for (Column column : tbl.getBaseSchema()) { - Expr expr; - if (column.getDefaultValue() != null) { - if (column.getDefaultValueExprDef() != null) { - expr = column.getDefaultValueExpr(); - } else { - expr = new StringLiteral(column.getDefaultValue()); - } - } else { - if (column.isAllowNull()) { - expr = NullLiteral.create(column.getType()); - } else { - expr = null; - } - } - SlotDescriptor slotDesc = destSlotDescByName.get(column.getName()); - // if slot desc is null, which mean it is an unrelated slot, just skip. - // eg: - // (a, b, c) set (x=a, y=b, z=c) - // c does not exist in file, the z will be filled with null, even if z has default value. - // and if z is not nullable, the load will fail. - if (slotDesc != null) { - if (expr != null) { - expr = castToSlot(slotDesc, expr); - params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift()); - } else { - params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr); - } - } - } - } - + // To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...) private void genSlotToSchemaIdMap() { List baseSchema = desc.getTable().getBaseSchema(); Map columnNameToPosition = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java index 18f878e41b..364844ef36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -18,12 +18,18 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.planner.FileLoadScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanNode; import org.apache.doris.thrift.TFileScanRangeParams; @@ -31,6 +37,7 @@ import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TScanRangeLocations; +import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -40,6 +47,7 @@ import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; /** * Base class for External File Scan, including external query and load. @@ -156,4 +164,50 @@ public class FileScanNode extends ExternalScanNode { throws UserException { scanProvider.createScanRangeLocations(conjuncts, params, backendPolicy, scanRangeLocations); } + + protected void setDefaultValueExprs(FileScanProviderIf scanProvider, + Map slotDescByName, + TFileScanRangeParams params, + boolean useVarcharAsNull) throws UserException { + TableIf tbl = scanProvider.getTargetTable(); + Preconditions.checkNotNull(tbl); + TExpr tExpr = new TExpr(); + tExpr.setNodes(Lists.newArrayList()); + + for (Column column : tbl.getBaseSchema()) { + Expr expr; + if (column.getDefaultValue() != null) { + if (column.getDefaultValueExprDef() != null) { + expr = column.getDefaultValueExpr(); + } else { + expr = new StringLiteral(column.getDefaultValue()); + } + } else { + if (column.isAllowNull()) { + // For load, use Varchar as Null, for query, use column type. + if (useVarcharAsNull) { + expr = NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR); + } else { + expr = NullLiteral.create(column.getType()); + } + } else { + expr = null; + } + } + SlotDescriptor slotDesc = slotDescByName.get(column.getName()); + // if slot desc is null, which mean it is an unrelated slot, just skip. + // eg: + // (a, b, c) set (x=a, y=b, z=c) + // c does not exist in file, the z will be filled with null, even if z has default value. + // and if z is not nullable, the load will fail. + if (slotDesc != null) { + if (expr != null) { + expr = castToSlot(slotDesc, expr); + params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift()); + } else { + params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr); + } + } + } + } }