diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 93f0cd8966..ffad6e33a0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1727,9 +1727,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean enable_quantile_state_type = true; - @ConfField - public static boolean enable_vectorized_load = true; - @ConfField public static boolean enable_pipeline_load = false; @@ -1797,12 +1794,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec - /** - * Temp config, should be removed when new file scan node is ready. - */ - @ConfField(mutable = true) - public static boolean enable_new_load_scan_node = true; - /** * Max data version of backends serialize block. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 9514eecaf3..cb90c075ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -137,7 +137,6 @@ public class LoadLoadingTask extends LoadTask { planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance); curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); - curCoordinator.setExecVecEngine(Config.enable_vectorized_load); curCoordinator.setExecPipEngine(Config.enable_pipeline_load); /* * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 31f30dd281..2daed4e237 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -36,12 +36,12 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.FileLoadScanNode; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; -import org.apache.doris.planner.external.ExternalFileScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TUniqueId; @@ -113,32 +113,28 @@ public class LoadingTaskPlanner { // Generate tuple descriptor TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); TupleDescriptor scanTupleDesc = destTupleDesc; - if (Config.enable_vectorized_load) { - scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); - } + scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); // use full schema to fill the descriptor table for (Column col : table.getFullSchema()) { SlotDescriptor slotDesc = descTable.addSlotDescriptor(destTupleDesc); slotDesc.setIsMaterialized(true); slotDesc.setColumn(col); slotDesc.setIsNullable(col.isAllowNull()); - if (Config.enable_vectorized_load) { - SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc); - scanSlotDesc.setIsMaterialized(true); - scanSlotDesc.setColumn(col); - scanSlotDesc.setIsNullable(col.isAllowNull()); - if (fileGroups.size() > 0) { - for (ImportColumnDesc importColumnDesc : fileGroups.get(0).getColumnExprList()) { - try { - if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null - && importColumnDesc.getColumnName().equals(col.getName())) { - scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable()); - break; - } - } catch (Exception e) { - // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed - // now. We just skip this case here. + SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc); + scanSlotDesc.setIsMaterialized(true); + scanSlotDesc.setColumn(col); + scanSlotDesc.setIsNullable(col.isAllowNull()); + if (fileGroups.size() > 0) { + for (ImportColumnDesc importColumnDesc : fileGroups.get(0).getColumnExprList()) { + try { + if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null + && importColumnDesc.getColumnName().equals(col.getName())) { + scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable()); + break; } + } catch (Exception e) { + // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed + // now. We just skip this case here. } } } @@ -165,14 +161,12 @@ public class LoadingTaskPlanner { // Generate plan trees // 1. Broker scan node ScanNode scanNode; - scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc, false); - ((ExternalFileScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, + scanNode = new FileLoadScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc); + ((FileLoadScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, fileStatusesList, filesAdded, strictMode, loadParallelism, userInfo); scanNode.init(analyzer); scanNode.finalize(analyzer); - if (Config.enable_vectorized_load) { - scanNode.convertToVectorized(); - } + scanNode.convertToVectorized(); scanNodes.add(scanNode); descTable.computeStatAndMemLayout(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index e19a027844..6a22f11c72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -139,7 +139,7 @@ import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.SortNode; import org.apache.doris.planner.TableFunctionNode; import org.apache.doris.planner.UnionNode; -import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPushAggOp; @@ -592,7 +592,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor fileGroupInfos = Lists.newArrayList(); + // For load, the num of providers equals to the num of file group infos. + private final List scanProviders = Lists.newArrayList(); + // For load, the num of ParamCreateContext equals to the num of file group infos. + private final List contexts = Lists.newArrayList(); + + /** + * External file scan node for load from file + * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + */ + public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc) { + super(id, desc, "FILE_LOAD_SCAN_NODE", StatisticalType.FILE_SCAN_NODE, false); + } + + // Only for broker load job. + public void setLoadInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc, + List fileGroups, List> fileStatusesList, + int filesAdded, boolean strictMode, int loadParallelism, UserIdentity userIdentity) { + Preconditions.checkState(fileGroups.size() == fileStatusesList.size()); + for (int i = 0; i < fileGroups.size(); ++i) { + FileGroupInfo fileGroupInfo = new FileGroupInfo(loadJobId, txnId, targetTable, brokerDesc, + fileGroups.get(i), fileStatusesList.get(i), filesAdded, strictMode, loadParallelism); + fileGroupInfos.add(fileGroupInfo); + } + } + + // Only for stream load/routine load job. + public void setLoadInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc, + BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode, + TFileType fileType, List hiddenColumns) { + FileGroupInfo fileGroupInfo = new FileGroupInfo(loadId, txnId, targetTable, brokerDesc, + fileGroup, fileStatus, strictMode, fileType, hiddenColumns); + fileGroupInfos.add(fileGroupInfo); + } + + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + + for (FileGroupInfo fileGroupInfo : fileGroupInfos) { + this.scanProviders.add(new LoadScanProvider(fileGroupInfo, desc)); + } + + backendPolicy.init(); + numNodes = backendPolicy.numBackends(); + + initParamCreateContexts(analyzer); + } + + // For each scan provider, create a corresponding ParamCreateContext + private void initParamCreateContexts(Analyzer analyzer) throws UserException { + for (FileScanProviderIf scanProvider : scanProviders) { + ParamCreateContext context = scanProvider.createContext(analyzer); + context.createDestSlotMap(); + // set where and preceding filter. + // FIXME(cmy): we should support set different expr for different file group. + initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer); + initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer); + context.conjuncts = conjuncts; + this.contexts.add(context); + } + } + + private void initAndSetPrecedingFilter(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) + throws UserException { + Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer); + if (newWhereExpr != null) { + addPreFilterConjuncts(newWhereExpr.getConjuncts()); + } + } + + private void initAndSetWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) + throws UserException { + Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer); + if (newWhereExpr != null) { + addConjuncts(newWhereExpr.getConjuncts()); + } + } + + private Expr initWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) throws UserException { + if (whereExpr == null) { + return null; + } + + Map dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + for (SlotDescriptor slotDescriptor : tupleDesc.getSlots()) { + dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor); + } + + // substitute SlotRef in filter expression + // where expr must be equal first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate) + Expr newWhereExpr = analyzer.getExprRewriter() + .rewrite(whereExpr, analyzer, ExprRewriter.ClauseType.WHERE_CLAUSE); + List slots = Lists.newArrayList(); + newWhereExpr.collect(SlotRef.class, slots); + + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + for (SlotRef slot : slots) { + SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName()); + if (slotDesc == null) { + throw new UserException( + "unknown column reference in where statement, reference=" + slot.getColumnName()); + } + smap.getLhs().add(slot); + smap.getRhs().add(new SlotRef(slotDesc)); + } + newWhereExpr = newWhereExpr.clone(smap); + newWhereExpr.analyze(analyzer); + if (!newWhereExpr.getType().equals(org.apache.doris.catalog.Type.BOOLEAN)) { + throw new UserException("where statement is not a valid statement return bool"); + } + return newWhereExpr; + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + Preconditions.checkState(contexts.size() == scanProviders.size(), + contexts.size() + " vs. " + scanProviders.size()); + for (int i = 0; i < contexts.size(); ++i) { + FileScanNode.ParamCreateContext context = contexts.get(i); + FileScanProviderIf scanProvider = scanProviders.get(i); + setDefaultValueExprs(scanProvider, context); + finalizeParamsForLoad(context, analyzer); + createScanRangeLocations(context, scanProvider); + this.inputSplitsNum += scanProvider.getInputSplitNum(); + this.totalFileSize += scanProvider.getInputFileSize(); + } + } + + protected void setDefaultValueExprs(FileScanProviderIf scanProvider, ParamCreateContext context) + throws UserException { + TableIf tbl = scanProvider.getTargetTable(); + Preconditions.checkNotNull(tbl); + TExpr tExpr = new TExpr(); + tExpr.setNodes(Lists.newArrayList()); + + for (Column column : tbl.getBaseSchema()) { + Expr expr; + if (column.getDefaultValue() != null) { + if (column.getDefaultValueExprDef() != null) { + expr = column.getDefaultValueExpr(); + } else { + expr = new StringLiteral(column.getDefaultValue()); + } + } else { + if (column.isAllowNull()) { + // In load process, the source type is string. + expr = NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR); + } else { + expr = null; + } + } + SlotDescriptor slotDesc = context.srcSlotDescByName.get(column.getName()); + // if slot desc is null, which mean it is an unrelated slot, just skip. + // eg: + // (a, b, c) set (x=a, y=b, z=c) + // c does not exist in file, the z will be filled with null, even if z has default value. + // and if z is not nullable, the load will fail. + if (slotDesc != null) { + if (expr != null) { + expr = castToSlot(slotDesc, expr); + context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift()); + } else { + context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr); + } + } + } + } + + protected void finalizeParamsForLoad(ParamCreateContext context, + Analyzer analyzer) throws UserException { + Map slotDescByName = context.srcSlotDescByName; + Map exprMap = context.exprMap; + TupleDescriptor srcTupleDesc = context.srcTupleDescriptor; + boolean negative = context.fileGroup.isNegative(); + + TFileScanRangeParams params = context.params; + Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); + for (SlotDescriptor destSlotDesc : desc.getSlots()) { + if (!destSlotDesc.isMaterialized()) { + continue; + } + Expr expr = null; + if (exprMap != null) { + expr = exprMap.get(destSlotDesc.getColumn().getName()); + } + if (expr == null) { + SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); + if (srcSlotDesc != null) { + destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); + // If dest is allow null, we set source to nullable + if (destSlotDesc.getColumn().isAllowNull()) { + srcSlotDesc.setIsNullable(true); + } + expr = new SlotRef(srcSlotDesc); + } else { + Column column = destSlotDesc.getColumn(); + if (column.getDefaultValue() != null) { + if (column.getDefaultValueExprDef() != null) { + expr = column.getDefaultValueExpr(); + } else { + expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); + } + } else { + if (column.isAllowNull()) { + expr = NullLiteral.create(column.getType()); + } else { + throw new AnalysisException("column has no source field, column=" + column.getName()); + } + } + } + } + + // check hll_hash + if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { + if (!(expr instanceof FunctionCallExpr)) { + throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx)"); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName() + .getFunction().equalsIgnoreCase("hll_empty")) { + throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or " + + destSlotDesc.getColumn().getName() + "=hll_empty()"); + } + expr.setType(org.apache.doris.catalog.Type.HLL); + } + + checkBitmapCompatibility(analyzer, destSlotDesc, expr); + checkQuantileStateCompatibility(analyzer, destSlotDesc, expr); + + if (negative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { + expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); + expr.analyze(analyzer); + } + + // for jsonb type, use jsonb_parse_xxx to parse src string to jsonb. + // and if input string is not a valid json string, return null. + PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType(); + PrimitiveType srcType = expr.getType().getPrimitiveType(); + if (dstType == PrimitiveType.JSONB + && (srcType == PrimitiveType.VARCHAR || srcType == PrimitiveType.STRING)) { + List args = Lists.newArrayList(); + args.add(expr); + String nullable = "notnull"; + if (destSlotDesc.getIsNullable() || expr.isNullable()) { + nullable = "nullable"; + } + String name = "jsonb_parse_" + nullable + "_error_to_null"; + expr = new FunctionCallExpr(name, args); + expr.analyze(analyzer); + } else if (dstType == PrimitiveType.VARIANT) { + // Generate SchemaChange expr for dynamicly generating columns + TableIf targetTbl = desc.getTable(); + expr = new SchemaChangeExpr((SlotRef) expr, (int) targetTbl.getId()); + } else { + expr = castToSlot(destSlotDesc, expr); + } + params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift()); + } + params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans); + params.setDestTupleId(desc.getId().asInt()); + params.setSrcTupleId(srcTupleDesc.getId().asInt()); + + // Need re compute memory layout after set some slot descriptor to nullable + srcTupleDesc.computeStatAndMemLayout(); + + if (!preFilterConjuncts.isEmpty()) { + Expr vPreFilterExpr = convertConjunctsToAndCompoundPredicate(preFilterConjuncts); + initCompoundPredicate(vPreFilterExpr); + params.setPreFilterExprs(vPreFilterExpr.treeToThrift()); + } + } + + protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) + throws AnalysisException { + if (slotDesc.getColumn().getAggregationType() == AggregateType.BITMAP_UNION) { + expr.analyze(analyzer); + if (!expr.getType().isBitmapType()) { + String errorMsg = String.format("bitmap column %s require the function return type is BITMAP", + slotDesc.getColumn().getName()); + throw new AnalysisException(errorMsg); + } + } + } + + protected void checkQuantileStateCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) + throws AnalysisException { + if (slotDesc.getColumn().getAggregationType() == AggregateType.QUANTILE_UNION) { + expr.analyze(analyzer); + if (!expr.getType().isQuantileStateType()) { + String errorMsg = "quantile_state column %s require the function return type is QUANTILE_STATE"; + throw new AnalysisException(errorMsg); + } + } + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java index a7432fd9fb..2d4e57702c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java @@ -28,7 +28,7 @@ import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Type; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.util.BitUtil; -import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TRuntimeFilterMode; @@ -379,7 +379,7 @@ public final class RuntimeFilterGenerator { * 2. Only olap scan nodes are supported: */ private void assignRuntimeFilters(ScanNode scanNode) { - if (!(scanNode instanceof OlapScanNode) && !(scanNode instanceof ExternalFileScanNode)) { + if (!(scanNode instanceof OlapScanNode) && !(scanNode instanceof FileQueryScanNode)) { return; } TupleId tid = scanNode.getTupleIds().get(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 99ebfa929a..4f0549449e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -66,7 +66,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; -import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.thrift.TNullSide; @@ -1984,7 +1984,7 @@ public class SingleNodePlanner { case HIVE: throw new RuntimeException("Hive external table is not supported, try to use hive catalog please"); case ICEBERG: - scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new FileQueryScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; case HUDI: throw new UserException( @@ -1997,7 +1997,7 @@ public class SingleNodePlanner { break; case HMS_EXTERNAL_TABLE: case ICEBERG_EXTERNAL_TABLE: - scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new FileQueryScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; case ES_EXTERNAL_TABLE: scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode", true); @@ -2012,7 +2012,7 @@ public class SingleNodePlanner { break; } if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode - || scanNode instanceof ExternalFileScanNode) { + || scanNode instanceof FileQueryScanNode) { if (analyzer.enableInferPredicate()) { PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), analyzer); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index a2f9ec846b..eb2e2df30e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -46,7 +46,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.RoutineLoadJob; -import org.apache.doris.planner.external.ExternalFileScanNode; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.PaloInternalServiceVersion; @@ -134,11 +133,9 @@ public class StreamLoadPlanner { // construct tuple descriptor, used for dataSink tupleDesc = descTable.createTupleDescriptor("DstTableTuple"); TupleDescriptor scanTupleDesc = tupleDesc; - if (Config.enable_vectorized_load) { - // note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info. - // construct tuple descriptor, used for scanNode - scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); - } + // note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info. + // construct tuple descriptor, used for scanNode + scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); boolean negative = taskInfo.getNegative(); // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { @@ -146,23 +143,20 @@ public class StreamLoadPlanner { slotDesc.setIsMaterialized(true); slotDesc.setColumn(col); slotDesc.setIsNullable(col.isAllowNull()); - - if (Config.enable_vectorized_load) { - SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc); - scanSlotDesc.setIsMaterialized(true); - scanSlotDesc.setColumn(col); - scanSlotDesc.setIsNullable(col.isAllowNull()); - for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) { - try { - if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null - && importColumnDesc.getColumnName().equals(col.getName())) { - scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable()); - break; - } - } catch (Exception e) { - // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now. - // We just skip this case here. + SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc); + scanSlotDesc.setIsMaterialized(true); + scanSlotDesc.setColumn(col); + scanSlotDesc.setIsNullable(col.isAllowNull()); + for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) { + try { + if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null + && importColumnDesc.getColumnName().equals(col.getName())) { + scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable()); + break; } + } catch (Exception e) { + // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now. + // We just skip this case here. } } if (negative && !col.isKey() && col.getAggregationType() != AggregateType.SUM) { @@ -172,9 +166,6 @@ public class StreamLoadPlanner { // Plan scan tuple of dynamic table if (destTable.isDynamicSchema()) { - if (!Config.enable_vectorized_load) { - throw new UserException("Only support vectorized load for dyanmic table: " + destTable.getName()); - } descTable.addReferencedTable(destTable); scanTupleDesc.setTable(destTable); // add a implict container column "DORIS_DYNAMIC_COL" for dynamic columns @@ -191,7 +182,7 @@ public class StreamLoadPlanner { } // create scan node - ExternalFileScanNode fileScanNode = new ExternalFileScanNode(new PlanNodeId(0), scanTupleDesc, false); + FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), scanTupleDesc); // 1. create file group DataDescription dataDescription = new DataDescription(destTable.getName(), taskInfo); dataDescription.analyzeWithoutCheckPriv(db.getFullName()); @@ -215,9 +206,7 @@ public class StreamLoadPlanner { scanNode.init(analyzer); scanNode.finalize(analyzer); - if (Config.enable_vectorized_load) { - scanNode.convertToVectorized(); - } + scanNode.convertToVectorized(); descTable.computeStatAndMemLayout(); int timeout = taskInfo.getTimeout(); @@ -273,7 +262,6 @@ public class StreamLoadPlanner { // for stream load, we use exec_mem_limit to limit the memory usage of load channel. queryOptions.setLoadMemLimit(taskInfo.getMemLimit()); //load - queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load); queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load); queryOptions.setBeExecVersion(Config.be_exec_version); queryOptions.setIsReportSuccess(taskInfo.getEnableProfile()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java deleted file mode 100644 index 6c6089b16c..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ /dev/null @@ -1,795 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.ArithmeticExpr; -import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.ExprSubstitutionMap; -import org.apache.doris.analysis.FunctionCallExpr; -import org.apache.doris.analysis.IntLiteral; -import org.apache.doris.analysis.NullLiteral; -import org.apache.doris.analysis.SchemaChangeExpr; -import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotId; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StringLiteral; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.analysis.UserIdentity; -import org.apache.doris.catalog.AggregateType; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.FunctionGenTable; -import org.apache.doris.catalog.FunctionSet; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.catalog.external.IcebergExternalTable; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; -import org.apache.doris.common.UserException; -import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; -import org.apache.doris.load.BrokerFileGroup; -import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; -import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.planner.external.iceberg.IcebergApiSource; -import org.apache.doris.planner.external.iceberg.IcebergHMSSource; -import org.apache.doris.planner.external.iceberg.IcebergScanProvider; -import org.apache.doris.planner.external.iceberg.IcebergSource; -import org.apache.doris.rewrite.ExprRewriter; -import org.apache.doris.statistics.StatisticalType; -import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; -import org.apache.doris.thrift.TBrokerFileStatus; -import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TExpr; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileScanNode; -import org.apache.doris.thrift.TFileScanRangeParams; -import org.apache.doris.thrift.TFileScanSlotInfo; -import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.TPlanNode; -import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.thrift.TScanRangeLocations; -import org.apache.doris.thrift.TUniqueId; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * ExternalFileScanNode for the file access type of catalog, now only support - * hive,hudi and iceberg. - */ -public class ExternalFileScanNode extends ExternalScanNode { - private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class); - - public static class ParamCreateContext { - public BrokerFileGroup fileGroup; - public List conjuncts; - - public TupleDescriptor destTupleDescriptor; - public Map destSlotDescByName; - // === Set when init === - public TupleDescriptor srcTupleDescriptor; - public Map srcSlotDescByName; - public Map exprMap; - public String timezone; - // === Set when init === - - public TFileScanRangeParams params; - - public void createDestSlotMap() { - Preconditions.checkNotNull(destTupleDescriptor); - destSlotDescByName = Maps.newHashMap(); - for (SlotDescriptor slot : destTupleDescriptor.getSlots()) { - destSlotDescByName.put(slot.getColumn().getName(), slot); - } - } - } - - public enum Type { - LOAD, QUERY - } - - private Type type = Type.QUERY; - private final FederationBackendPolicy backendPolicy = new FederationBackendPolicy(); - - // Only for load job. - // Save all info about load attributes and files. - // Each DataDescription in a load stmt conreponding to a FileGroupInfo in this list. - private List fileGroupInfos = Lists.newArrayList(); - // For query, there is only one FileScanProvider in this list. - // For load, the num of providers equals to the num of file group infos. - private List scanProviders = Lists.newArrayList(); - // For query, there is only one ParamCreateContext in this list. - // For load, the num of ParamCreateContext equals to the num of file group infos. - private List contexts = Lists.newArrayList(); - - // Final output of this file scan node - private List scanRangeLocations = Lists.newArrayList(); - - // For explain - private long inputSplitsNum = 0; - private long totalFileSize = 0; - private long totalPartitionNum = 0; - private long readPartitionNum = 0; - - /** - * External file scan node for: - * 1. Query hms table - * 2. Load from file - * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv - * eg: s3 tvf, load scan node. - * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check - */ - public ExternalFileScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "EXTERNAL_FILE_SCAN_NODE", StatisticalType.FILE_SCAN_NODE, needCheckColumnPriv); - } - - // Only for broker load job. - public void setLoadInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc, - List fileGroups, List> fileStatusesList, int filesAdded, - boolean strictMode, int loadParallelism, UserIdentity userIdentity) { - Preconditions.checkState(fileGroups.size() == fileStatusesList.size()); - for (int i = 0; i < fileGroups.size(); ++i) { - FileGroupInfo fileGroupInfo = new FileGroupInfo(loadJobId, txnId, targetTable, brokerDesc, - fileGroups.get(i), fileStatusesList.get(i), filesAdded, strictMode, loadParallelism); - fileGroupInfos.add(fileGroupInfo); - } - this.type = Type.LOAD; - } - - // Only for stream load/routine load job. - public void setLoadInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc, - BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode, TFileType fileType, - List hiddenColumns) { - FileGroupInfo fileGroupInfo = new FileGroupInfo(loadId, txnId, targetTable, brokerDesc, - fileGroup, fileStatus, strictMode, fileType, hiddenColumns); - fileGroupInfos.add(fileGroupInfo); - this.type = Type.LOAD; - } - - @Override - public void init(Analyzer analyzer) throws UserException { - super.init(analyzer); - - if (!Config.enable_vectorized_load) { - throw new UserException( - "Please set 'enable_vectorized_load=true' in fe.conf to enable external file scan node"); - } - - switch (type) { - case QUERY: - // prepare for partition prune - computeColumnFilter(); - if (this.desc.getTable() instanceof HMSExternalTable) { - HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); - initHMSExternalTable(hmsTable); - } else if (this.desc.getTable() instanceof FunctionGenTable) { - FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); - initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); - } else if (this.desc.getTable() instanceof IcebergExternalTable) { - IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); - initIcebergExternalTable(table); - } - break; - case LOAD: - for (FileGroupInfo fileGroupInfo : fileGroupInfos) { - this.scanProviders.add(new LoadScanProvider(fileGroupInfo, desc)); - } - break; - default: - throw new UserException("Unknown type: " + type); - } - - backendPolicy.init(); - numNodes = backendPolicy.numBackends(); - - initParamCreateContexts(analyzer); - } - - /** - * Init ExternalFileScanNode, ONLY used for Nereids. Should NOT use this function in anywhere else. - */ - public void init() throws UserException { - if (!Config.enable_vectorized_load) { - throw new UserException( - "Please set 'enable_vectorized_load=true' in fe.conf to enable external file scan node"); - } - - switch (type) { - case QUERY: - // prepare for partition prune - // computeColumnFilter(); - if (this.desc.getTable() instanceof HMSExternalTable) { - HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); - initHMSExternalTable(hmsTable); - } else if (this.desc.getTable() instanceof FunctionGenTable) { - FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); - initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); - } else if (this.desc.getTable() instanceof IcebergExternalTable) { - IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); - initIcebergExternalTable(table); - } - break; - default: - throw new UserException("Unknown type: " + type); - } - - backendPolicy.init(); - numNodes = backendPolicy.numBackends(); - for (FileScanProviderIf scanProvider : scanProviders) { - ParamCreateContext context = scanProvider.createContext(analyzer); - context.createDestSlotMap(); - initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer); - initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer); - context.conjuncts = conjuncts; - this.contexts.add(context); - } - } - - /** - * Reset required_slots in contexts. This is called after Nereids planner do the projection. - * In the projection process, some slots may be removed. So call this to update the slots info. - */ - @Override - public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext, - Set requiredByProjectSlotIdSet) throws UserException { - for (int i = 0; i < contexts.size(); i++) { - ParamCreateContext context = contexts.get(i); - FileScanProviderIf scanProvider = scanProviders.get(i); - context.params.unsetRequiredSlots(); - for (SlotDescriptor slot : desc.getSlots()) { - if (!slot.isMaterialized()) { - continue; - } - - TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); - slotInfo.setSlotId(slot.getId().asInt()); - slotInfo.setIsFileSlot(!scanProvider.getPathPartitionKeys().contains(slot.getColumn().getName())); - context.params.addToRequiredSlots(slotInfo); - } - } - } - - private void initHMSExternalTable(HMSExternalTable hmsTable) throws UserException { - Preconditions.checkNotNull(hmsTable); - - if (hmsTable.isView()) { - throw new AnalysisException( - String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(), - hmsTable.getDbName(), hmsTable.getName())); - } - - FileScanProviderIf scanProvider; - switch (hmsTable.getDlaType()) { - case HUDI: - scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange); - break; - case ICEBERG: - IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc, columnNameToRange); - scanProvider = new IcebergScanProvider(hmsSource, analyzer); - break; - case HIVE: - String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); - if (inputFormat.contains("TextInputFormat")) { - for (SlotDescriptor slot : desc.getSlots()) { - if (!slot.getType().isScalarType()) { - throw new UserException("For column `" + slot.getColumn().getName() - + "`, The column types ARRAY/MAP/STRUCT are not supported yet" - + " for text input format of Hive. "); - } - } - } - scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange); - break; - default: - throw new UserException("Unknown table type: " + hmsTable.getDlaType()); - } - this.scanProviders.add(scanProvider); - } - - private void initIcebergExternalTable(IcebergExternalTable icebergTable) throws UserException { - Preconditions.checkNotNull(icebergTable); - if (icebergTable.isView()) { - throw new AnalysisException( - String.format("Querying external view '%s.%s' is not supported", icebergTable.getDbName(), - icebergTable.getName())); - } - - FileScanProviderIf scanProvider; - String catalogType = icebergTable.getIcebergCatalogType(); - switch (catalogType) { - case IcebergExternalCatalog.ICEBERG_HMS: - case IcebergExternalCatalog.ICEBERG_REST: - case IcebergExternalCatalog.ICEBERG_DLF: - case IcebergExternalCatalog.ICEBERG_GLUE: - IcebergSource icebergSource = new IcebergApiSource( - icebergTable, desc, columnNameToRange); - scanProvider = new IcebergScanProvider(icebergSource, analyzer); - break; - default: - throw new UserException("Unknown iceberg catalog type: " + catalogType); - } - this.scanProviders.add(scanProvider); - } - - private void initFunctionGenTable(FunctionGenTable table, ExternalFileTableValuedFunction tvf) { - Preconditions.checkNotNull(table); - FileScanProviderIf scanProvider = new TVFScanProvider(table, desc, tvf); - this.scanProviders.add(scanProvider); - } - - // For each scan provider, create a corresponding ParamCreateContext - private void initParamCreateContexts(Analyzer analyzer) throws UserException { - for (FileScanProviderIf scanProvider : scanProviders) { - ParamCreateContext context = scanProvider.createContext(analyzer); - context.createDestSlotMap(); - // set where and preceding filter. - // FIXME(cmy): we should support set different expr for different file group. - initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer); - initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer); - context.conjuncts = conjuncts; - this.contexts.add(context); - } - } - - private void initAndSetPrecedingFilter(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) - throws UserException { - if (type != Type.LOAD) { - return; - } - Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer); - if (newWhereExpr != null) { - addPreFilterConjuncts(newWhereExpr.getConjuncts()); - } - } - - private void initAndSetWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) - throws UserException { - Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer); - if (newWhereExpr != null) { - addConjuncts(newWhereExpr.getConjuncts()); - } - } - - private Expr initWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) throws UserException { - if (whereExpr == null) { - return null; - } - - Map dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - for (SlotDescriptor slotDescriptor : tupleDesc.getSlots()) { - dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor); - } - - // substitute SlotRef in filter expression - // where expr must be equal first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate) - Expr newWhereExpr = analyzer.getExprRewriter() - .rewrite(whereExpr, analyzer, ExprRewriter.ClauseType.WHERE_CLAUSE); - List slots = Lists.newArrayList(); - newWhereExpr.collect(SlotRef.class, slots); - - ExprSubstitutionMap smap = new ExprSubstitutionMap(); - for (SlotRef slot : slots) { - SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName()); - if (slotDesc == null) { - throw new UserException( - "unknown column reference in where statement, reference=" + slot.getColumnName()); - } - smap.getLhs().add(slot); - smap.getRhs().add(new SlotRef(slotDesc)); - } - newWhereExpr = newWhereExpr.clone(smap); - newWhereExpr.analyze(analyzer); - if (!newWhereExpr.getType().equals(org.apache.doris.catalog.Type.BOOLEAN)) { - throw new UserException("where statement is not a valid statement return bool"); - } - return newWhereExpr; - } - - @Override - public void finalize(Analyzer analyzer) throws UserException { - Preconditions.checkState(contexts.size() == scanProviders.size(), - contexts.size() + " vs. " + scanProviders.size()); - for (int i = 0; i < contexts.size(); ++i) { - ParamCreateContext context = contexts.get(i); - FileScanProviderIf scanProvider = scanProviders.get(i); - setDefaultValueExprs(scanProvider, context); - setColumnPositionMappingForTextFile(scanProvider, context); - finalizeParamsForLoad(context, analyzer); - createScanRangeLocations(context, scanProvider); - this.inputSplitsNum += scanProvider.getInputSplitNum(); - this.totalFileSize += scanProvider.getInputFileSize(); - TableIf table = desc.getTable(); - if (table instanceof HMSExternalTable) { - if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { - genSlotToSchemaIdMap(context); - } - } - if (scanProvider instanceof HiveScanProvider) { - this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); - this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); - } - } - } - - @Override - public void finalizeForNereids() throws UserException { - Preconditions.checkState(contexts.size() == scanProviders.size(), - contexts.size() + " vs. " + scanProviders.size()); - for (int i = 0; i < contexts.size(); ++i) { - ParamCreateContext context = contexts.get(i); - FileScanProviderIf scanProvider = scanProviders.get(i); - setDefaultValueExprs(scanProvider, context); - setColumnPositionMappingForTextFile(scanProvider, context); - finalizeParamsForLoad(context, analyzer); - createScanRangeLocations(context, scanProvider); - this.inputSplitsNum += scanProvider.getInputSplitNum(); - this.totalFileSize += scanProvider.getInputFileSize(); - TableIf table = desc.getTable(); - if (table instanceof HMSExternalTable) { - if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { - genSlotToSchemaIdMap(context); - } - } - if (scanProvider instanceof HiveScanProvider) { - this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); - this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); - } - } - } - - private void setColumnPositionMappingForTextFile(FileScanProviderIf scanProvider, ParamCreateContext context) - throws UserException { - if (type != Type.QUERY) { - return; - } - TableIf tbl = scanProvider.getTargetTable(); - List columnIdxs = Lists.newArrayList(); - - for (TFileScanSlotInfo slot : context.params.getRequiredSlots()) { - if (!slot.isIsFileSlot()) { - continue; - } - SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId()); - String colName = slotDesc.getColumn().getName(); - int idx = tbl.getBaseColumnIdxByName(colName); - if (idx == -1) { - throw new UserException("Column " + colName + " not found in table " + tbl.getName()); - } - columnIdxs.add(idx); - } - context.params.setColumnIdxs(columnIdxs); - } - - protected void setDefaultValueExprs(FileScanProviderIf scanProvider, ParamCreateContext context) - throws UserException { - TableIf tbl = scanProvider.getTargetTable(); - Preconditions.checkNotNull(tbl); - TExpr tExpr = new TExpr(); - tExpr.setNodes(Lists.newArrayList()); - - for (Column column : tbl.getBaseSchema()) { - Expr expr; - if (column.getDefaultValue() != null) { - if (column.getDefaultValueExprDef() != null) { - expr = column.getDefaultValueExpr(); - } else { - expr = new StringLiteral(column.getDefaultValue()); - } - } else { - if (column.isAllowNull()) { - if (type == Type.LOAD) { - // In load process, the source type is string. - expr = NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR); - } else { - expr = NullLiteral.create(column.getType()); - } - } else { - expr = null; - } - } - SlotDescriptor slotDesc = null; - switch (type) { - case LOAD: { - slotDesc = context.srcSlotDescByName.get(column.getName()); - break; - } - case QUERY: { - slotDesc = context.destSlotDescByName.get(column.getName()); - break; - } - default: - Preconditions.checkState(false, type); - } - // if slot desc is null, which mean it is an unrelated slot, just skip. - // eg: - // (a, b, c) set (x=a, y=b, z=c) - // c does not exist in file, the z will be filled with null, even if z has default value. - // and if z is not nullable, the load will fail. - if (slotDesc != null) { - if (expr != null) { - expr = castToSlot(slotDesc, expr); - context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift()); - } else { - context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr); - } - } - } - } - - protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer analyzer) throws UserException { - if (type != Type.LOAD) { - context.params.setSrcTupleId(-1); - return; - } - Map slotDescByName = context.srcSlotDescByName; - Map exprMap = context.exprMap; - TupleDescriptor srcTupleDesc = context.srcTupleDescriptor; - boolean negative = context.fileGroup.isNegative(); - - TFileScanRangeParams params = context.params; - Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); - for (SlotDescriptor destSlotDesc : desc.getSlots()) { - if (!destSlotDesc.isMaterialized()) { - continue; - } - Expr expr = null; - if (exprMap != null) { - expr = exprMap.get(destSlotDesc.getColumn().getName()); - } - if (expr == null) { - SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); - if (srcSlotDesc != null) { - destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); - // If dest is allow null, we set source to nullable - if (destSlotDesc.getColumn().isAllowNull()) { - srcSlotDesc.setIsNullable(true); - } - expr = new SlotRef(srcSlotDesc); - } else { - Column column = destSlotDesc.getColumn(); - if (column.getDefaultValue() != null) { - if (column.getDefaultValueExprDef() != null) { - expr = column.getDefaultValueExpr(); - } else { - expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); - } - } else { - if (column.isAllowNull()) { - expr = NullLiteral.create(column.getType()); - } else { - throw new AnalysisException("column has no source field, column=" + column.getName()); - } - } - } - } - - // check hll_hash - if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { - if (!(expr instanceof FunctionCallExpr)) { - throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " - + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx)"); - } - FunctionCallExpr fn = (FunctionCallExpr) expr; - if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName() - .getFunction().equalsIgnoreCase("hll_empty")) { - throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " - + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or " - + destSlotDesc.getColumn().getName() + "=hll_empty()"); - } - expr.setType(org.apache.doris.catalog.Type.HLL); - } - - checkBitmapCompatibility(analyzer, destSlotDesc, expr); - checkQuantileStateCompatibility(analyzer, destSlotDesc, expr); - - if (negative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { - expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); - expr.analyze(analyzer); - } - - // for jsonb type, use jsonb_parse_xxx to parse src string to jsonb. - // and if input string is not a valid json string, return null. - PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType(); - PrimitiveType srcType = expr.getType().getPrimitiveType(); - if (dstType == PrimitiveType.JSONB - && (srcType == PrimitiveType.VARCHAR || srcType == PrimitiveType.STRING)) { - List args = Lists.newArrayList(); - args.add(expr); - String nullable = "notnull"; - if (destSlotDesc.getIsNullable() || expr.isNullable()) { - nullable = "nullable"; - } - String name = "jsonb_parse_" + nullable + "_error_to_null"; - expr = new FunctionCallExpr(name, args); - expr.analyze(analyzer); - } else if (dstType == PrimitiveType.VARIANT) { - // Generate SchemaChange expr for dynamicly generating columns - TableIf targetTbl = desc.getTable(); - expr = new SchemaChangeExpr((SlotRef) expr, (int) targetTbl.getId()); - } else { - expr = castToSlot(destSlotDesc, expr); - } - params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift()); - } - params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans); - params.setDestTupleId(desc.getId().asInt()); - params.setSrcTupleId(srcTupleDesc.getId().asInt()); - - // Need re compute memory layout after set some slot descriptor to nullable - srcTupleDesc.computeStatAndMemLayout(); - - if (!preFilterConjuncts.isEmpty()) { - Expr vPreFilterExpr = convertConjunctsToAndCompoundPredicate(preFilterConjuncts); - initCompoundPredicate(vPreFilterExpr); - params.setPreFilterExprs(vPreFilterExpr.treeToThrift()); - } - } - - protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) - throws AnalysisException { - if (slotDesc.getColumn().getAggregationType() == AggregateType.BITMAP_UNION) { - expr.analyze(analyzer); - if (!expr.getType().isBitmapType()) { - String errorMsg = String.format("bitmap column %s require the function return type is BITMAP", - slotDesc.getColumn().getName()); - throw new AnalysisException(errorMsg); - } - } - } - - protected void checkQuantileStateCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) - throws AnalysisException { - if (slotDesc.getColumn().getAggregationType() == AggregateType.QUANTILE_UNION) { - expr.analyze(analyzer); - if (!expr.getType().isQuantileStateType()) { - String errorMsg = "quantile_state column %s require the function return type is QUANTILE_STATE"; - throw new AnalysisException(errorMsg); - } - } - } - - private void createScanRangeLocations(ParamCreateContext context, FileScanProviderIf scanProvider) - throws UserException { - scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations); - } - - private void genSlotToSchemaIdMap(ParamCreateContext context) { - List baseSchema = desc.getTable().getBaseSchema(); - Map columnNameToPosition = Maps.newHashMap(); - for (SlotDescriptor slot : desc.getSlots()) { - int idx = 0; - for (Column col : baseSchema) { - if (col.getName().equals(slot.getColumn().getName())) { - columnNameToPosition.put(col.getName(), idx); - break; - } - idx += 1; - } - } - context.params.setSlotNameToSchemaPos(columnNameToPosition); - } - - @Override - public int getNumInstances() { - return scanRangeLocations.size(); - } - - @Override - protected void toThrift(TPlanNode planNode) { - planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE); - TFileScanNode fileScanNode = new TFileScanNode(); - fileScanNode.setTupleId(desc.getId().asInt()); - planNode.setFileScanNode(fileScanNode); - } - - @Override - public List getScanRangeLocations(long maxScanRangeLength) { - LOG.debug("There is {} scanRangeLocations for execution.", scanRangeLocations.size()); - return scanRangeLocations; - } - - @Override - public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append(prefix).append("table: ").append(desc.getTable().getName()).append("\n"); - if (!conjuncts.isEmpty()) { - output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); - } - if (!runtimeFilters.isEmpty()) { - output.append(prefix).append("runtime filters: "); - output.append(getRuntimeFilterExplainString(false)); - } - - output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=") - .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); - output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum) - .append("\n"); - - if (detailLevel == TExplainLevel.VERBOSE) { - output.append(prefix).append("backends:").append("\n"); - Multimap scanRangeLocationsMap = ArrayListMultimap.create(); - // 1. group by backend id - for (TScanRangeLocations locations : scanRangeLocations) { - scanRangeLocationsMap.putAll(locations.getLocations().get(0).backend_id, - locations.getScanRange().getExtScanRange().getFileScanRange().getRanges()); - } - for (long beId : scanRangeLocationsMap.keySet()) { - output.append(prefix).append(" ").append(beId).append("\n"); - List fileRangeDescs = Lists.newArrayList(scanRangeLocationsMap.get(beId)); - // 2. sort by file start offset - Collections.sort(fileRangeDescs, new Comparator() { - @Override - public int compare(TFileRangeDesc o1, TFileRangeDesc o2) { - return Long.compare(o1.getStartOffset(), o2.getStartOffset()); - } - }); - // 3. if size <= 4, print all. if size > 4, print first 3 and last 1 - int size = fileRangeDescs.size(); - if (size <= 4) { - for (TFileRangeDesc file : fileRangeDescs) { - output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); - } - } else { - for (int i = 0; i < 3; i++) { - TFileRangeDesc file = fileRangeDescs.get(i); - output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); - } - int other = size - 4; - output.append(prefix).append(" ... other ").append(other).append(" files ...\n"); - TFileRangeDesc file = fileRangeDescs.get(size - 1); - output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); - } - } - } - - output.append(prefix); - if (cardinality > 0) { - output.append(String.format("cardinality=%s, ", cardinality)); - } - if (avgRowSize > 0) { - output.append(String.format("avgRowSize=%s, ", avgRowSize)); - } - output.append(String.format("numNodes=%s", numNodes)).append("\n"); - - return output.toString(); - } -} - diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java index 8eabeb9d5d..2fb3842b40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java @@ -21,9 +21,12 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.statistics.StatisticalType; -import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TScanRangeLocations; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; /** @@ -33,10 +36,20 @@ import java.util.List; * For example: * hive, iceberg, hudi, es, odbc */ -public class ExternalScanNode extends ScanNode { +public abstract class ExternalScanNode extends ScanNode { + private static final Logger LOG = LogManager.getLogger(ExternalScanNode.class); // set to false means this scan node does not need to check column priv. - private boolean needCheckColumnPriv; + protected boolean needCheckColumnPriv; + + // For explain + protected long inputSplitsNum = 0; + protected long totalFileSize = 0; + protected long totalPartitionNum = 0; + protected long readPartitionNum = 0; + + // Final output of this file scan node + protected List scanRangeLocations = Lists.newArrayList(); public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -46,16 +59,17 @@ public class ExternalScanNode extends ScanNode { @Override public List getScanRangeLocations(long maxScanRangeLength) { - return null; - } - - @Override - protected void toThrift(TPlanNode msg) { - + LOG.debug("There is {} scanRangeLocations for execution.", scanRangeLocations.size()); + return scanRangeLocations; } @Override public boolean needToCheckColumnPriv() { return this.needCheckColumnPriv; } + + @Override + public int getNumInstances() { + return scanRangeLocations.size(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java index 770ebd9a7a..ca14046c76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java @@ -29,7 +29,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.Util; import org.apache.doris.load.BrokerFileGroup; -import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExternalScanRange; @@ -188,8 +187,9 @@ public class FileGroupInfo { LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance); } - public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException { + public void createScanRangeLocations(FileScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy); long curInstanceBytes = 0; long curFileOffset = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java new file mode 100644 index 0000000000..de523ea8e5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.FunctionGenTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.catalog.external.IcebergExternalTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.external.iceberg.IcebergApiSource; +import org.apache.doris.planner.external.iceberg.IcebergHMSSource; +import org.apache.doris.planner.external.iceberg.IcebergScanProvider; +import org.apache.doris.planner.external.iceberg.IcebergSource; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TFileScanSlotInfo; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * ExternalFileScanNode for the file access type of catalog, now only support + * hive,hudi and iceberg. + */ +public class FileQueryScanNode extends FileScanNode { + private static final Logger LOG = LogManager.getLogger(FileQueryScanNode.class); + + // For query, there is only one FileScanProvider. + private FileScanProviderIf scanProvider; + // For query, there is only one ParamCreateContext. + private ParamCreateContext context; + + /** + * External file scan node for Query hms table + * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv + * eg: s3 tvf + * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + */ + public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + super(id, desc, "FILE_QUERY_SCAN_NODE", StatisticalType.FILE_SCAN_NODE, needCheckColumnPriv); + } + + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + + computeColumnFilter(); + if (this.desc.getTable() instanceof HMSExternalTable) { + HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); + initHMSExternalTable(hmsTable); + } else if (this.desc.getTable() instanceof FunctionGenTable) { + FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); + initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); + } else if (this.desc.getTable() instanceof IcebergExternalTable) { + IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); + initIcebergExternalTable(table); + } + backendPolicy.init(); + numNodes = backendPolicy.numBackends(); + initParamCreateContexts(analyzer); + } + + /** + * Init ExternalFileScanNode, ONLY used for Nereids. Should NOT use this function in anywhere else. + */ + public void init() throws UserException { + // prepare for partition prune + // computeColumnFilter(); + if (this.desc.getTable() instanceof HMSExternalTable) { + HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); + initHMSExternalTable(hmsTable); + } else if (this.desc.getTable() instanceof FunctionGenTable) { + FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); + initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); + } else if (this.desc.getTable() instanceof IcebergExternalTable) { + IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); + initIcebergExternalTable(table); + } + + backendPolicy.init(); + numNodes = backendPolicy.numBackends(); + context = scanProvider.createContext(analyzer); + context.createDestSlotMap(); + context.conjuncts = conjuncts; + } + + /** + * Reset required_slots in contexts. This is called after Nereids planner do the projection. + * In the projection process, some slots may be removed. So call this to update the slots info. + */ + @Override + public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext, + Set requiredByProjectSlotIdSet) throws UserException { + context.params.unsetRequiredSlots(); + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(slot.getId().asInt()); + slotInfo.setIsFileSlot(!scanProvider.getPathPartitionKeys().contains(slot.getColumn().getName())); + context.params.addToRequiredSlots(slotInfo); + } + } + + private void initHMSExternalTable(HMSExternalTable hmsTable) throws UserException { + Preconditions.checkNotNull(hmsTable); + + if (hmsTable.isView()) { + throw new AnalysisException( + String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(), + hmsTable.getDbName(), hmsTable.getName())); + } + + switch (hmsTable.getDlaType()) { + case HUDI: + scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange); + break; + case ICEBERG: + IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc, columnNameToRange); + scanProvider = new IcebergScanProvider(hmsSource, analyzer); + break; + case HIVE: + String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); + if (inputFormat.contains("TextInputFormat")) { + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.getType().isScalarType()) { + throw new UserException("For column `" + slot.getColumn().getName() + + "`, The column types ARRAY/MAP/STRUCT are not supported yet" + + " for text input format of Hive. "); + } + } + } + scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange); + break; + default: + throw new UserException("Unknown table type: " + hmsTable.getDlaType()); + } + } + + private void initIcebergExternalTable(IcebergExternalTable icebergTable) throws UserException { + Preconditions.checkNotNull(icebergTable); + if (icebergTable.isView()) { + throw new AnalysisException( + String.format("Querying external view '%s.%s' is not supported", icebergTable.getDbName(), + icebergTable.getName())); + } + + String catalogType = icebergTable.getIcebergCatalogType(); + switch (catalogType) { + case IcebergExternalCatalog.ICEBERG_HMS: + case IcebergExternalCatalog.ICEBERG_REST: + case IcebergExternalCatalog.ICEBERG_DLF: + case IcebergExternalCatalog.ICEBERG_GLUE: + IcebergSource icebergSource = new IcebergApiSource( + icebergTable, desc, columnNameToRange); + scanProvider = new IcebergScanProvider(icebergSource, analyzer); + break; + default: + throw new UserException("Unknown iceberg catalog type: " + catalogType); + } + } + + private void initFunctionGenTable(FunctionGenTable table, ExternalFileTableValuedFunction tvf) { + Preconditions.checkNotNull(table); + scanProvider = new TVFScanProvider(table, desc, tvf); + } + + // Create a corresponding ParamCreateContext + private void initParamCreateContexts(Analyzer analyzer) throws UserException { + context = scanProvider.createContext(analyzer); + context.createDestSlotMap(); + context.conjuncts = conjuncts; + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + setDefaultValueExprs(scanProvider, context); + setColumnPositionMappingForTextFile(scanProvider, context); + context.params.setSrcTupleId(-1); + createScanRangeLocations(context, scanProvider); + this.inputSplitsNum += scanProvider.getInputSplitNum(); + this.totalFileSize += scanProvider.getInputFileSize(); + TableIf table = desc.getTable(); + if (table instanceof HMSExternalTable) { + if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + genSlotToSchemaIdMap(context); + } + } + if (scanProvider instanceof HiveScanProvider) { + this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); + this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); + } + } + + @Override + public void finalizeForNereids() throws UserException { + setDefaultValueExprs(scanProvider, context); + setColumnPositionMappingForTextFile(scanProvider, context); + context.params.setSrcTupleId(-1); + createScanRangeLocations(context, scanProvider); + this.inputSplitsNum += scanProvider.getInputSplitNum(); + this.totalFileSize += scanProvider.getInputFileSize(); + TableIf table = desc.getTable(); + if (table instanceof HMSExternalTable) { + if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + genSlotToSchemaIdMap(context); + } + } + if (scanProvider instanceof HiveScanProvider) { + this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); + this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); + } + } + + private void setColumnPositionMappingForTextFile(FileScanProviderIf scanProvider, ParamCreateContext context) + throws UserException { + TableIf tbl = scanProvider.getTargetTable(); + List columnIdxs = Lists.newArrayList(); + + for (TFileScanSlotInfo slot : context.params.getRequiredSlots()) { + if (!slot.isIsFileSlot()) { + continue; + } + SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId()); + String colName = slotDesc.getColumn().getName(); + int idx = tbl.getBaseColumnIdxByName(colName); + if (idx == -1) { + throw new UserException("Column " + colName + " not found in table " + tbl.getName()); + } + columnIdxs.add(idx); + } + context.params.setColumnIdxs(columnIdxs); + } + + protected void setDefaultValueExprs(FileScanProviderIf scanProvider, ParamCreateContext context) + throws UserException { + TableIf tbl = scanProvider.getTargetTable(); + Preconditions.checkNotNull(tbl); + TExpr tExpr = new TExpr(); + tExpr.setNodes(Lists.newArrayList()); + + for (Column column : tbl.getBaseSchema()) { + Expr expr; + if (column.getDefaultValue() != null) { + if (column.getDefaultValueExprDef() != null) { + expr = column.getDefaultValueExpr(); + } else { + expr = new StringLiteral(column.getDefaultValue()); + } + } else { + if (column.isAllowNull()) { + expr = NullLiteral.create(column.getType()); + } else { + expr = null; + } + } + SlotDescriptor slotDesc = context.destSlotDescByName.get(column.getName()); + // if slot desc is null, which mean it is an unrelated slot, just skip. + // eg: + // (a, b, c) set (x=a, y=b, z=c) + // c does not exist in file, the z will be filled with null, even if z has default value. + // and if z is not nullable, the load will fail. + if (slotDesc != null) { + if (expr != null) { + expr = castToSlot(slotDesc, expr); + context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift()); + } else { + context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr); + } + } + } + } + + private void genSlotToSchemaIdMap(ParamCreateContext context) { + List baseSchema = desc.getTable().getBaseSchema(); + Map columnNameToPosition = Maps.newHashMap(); + for (SlotDescriptor slot : desc.getSlots()) { + int idx = 0; + for (Column col : baseSchema) { + if (col.getName().equals(slot.getColumn().getName())) { + columnNameToPosition.put(col.getName(), idx); + break; + } + idx += 1; + } + } + context.params.setSlotNameToSchemaPos(columnNameToPosition); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java new file mode 100644 index 0000000000..56ea637d1a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.UserException; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanNode; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +/** + * Base class for External File Scan, including external query and load. + */ +public class FileScanNode extends ExternalScanNode { + private static final Logger LOG = LogManager.getLogger(FileScanNode.class); + + public static class ParamCreateContext { + public BrokerFileGroup fileGroup; + public List conjuncts; + + public TupleDescriptor destTupleDescriptor; + public Map destSlotDescByName; + // === Set when init === + public TupleDescriptor srcTupleDescriptor; + public Map srcSlotDescByName; + public Map exprMap; + public String timezone; + // === Set when init === + + public TFileScanRangeParams params; + + public void createDestSlotMap() { + Preconditions.checkNotNull(destTupleDescriptor); + destSlotDescByName = Maps.newHashMap(); + for (SlotDescriptor slot : destTupleDescriptor.getSlots()) { + destSlotDescByName.put(slot.getColumn().getName(), slot); + } + } + } + + protected final FederationBackendPolicy backendPolicy = new FederationBackendPolicy(); + + public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, + boolean needCheckColumnPriv) { + super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + this.needCheckColumnPriv = needCheckColumnPriv; + } + + @Override + protected void toThrift(TPlanNode planNode) { + planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE); + TFileScanNode fileScanNode = new TFileScanNode(); + fileScanNode.setTupleId(desc.getId().asInt()); + planNode.setFileScanNode(fileScanNode); + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + output.append(prefix).append("table: ").append(desc.getTable().getName()).append("\n"); + if (!conjuncts.isEmpty()) { + output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); + } + if (!runtimeFilters.isEmpty()) { + output.append(prefix).append("runtime filters: "); + output.append(getRuntimeFilterExplainString(false)); + } + + output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=") + .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); + output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum) + .append("\n"); + + if (detailLevel == TExplainLevel.VERBOSE) { + output.append(prefix).append("backends:").append("\n"); + Multimap scanRangeLocationsMap = ArrayListMultimap.create(); + // 1. group by backend id + for (TScanRangeLocations locations : scanRangeLocations) { + scanRangeLocationsMap.putAll(locations.getLocations().get(0).backend_id, + locations.getScanRange().getExtScanRange().getFileScanRange().getRanges()); + } + for (long beId : scanRangeLocationsMap.keySet()) { + output.append(prefix).append(" ").append(beId).append("\n"); + List fileRangeDescs = Lists.newArrayList(scanRangeLocationsMap.get(beId)); + // 2. sort by file start offset + Collections.sort(fileRangeDescs, new Comparator() { + @Override + public int compare(TFileRangeDesc o1, TFileRangeDesc o2) { + return Long.compare(o1.getStartOffset(), o2.getStartOffset()); + } + }); + // 3. if size <= 4, print all. if size > 4, print first 3 and last 1 + int size = fileRangeDescs.size(); + if (size <= 4) { + for (TFileRangeDesc file : fileRangeDescs) { + output.append(prefix).append(" ").append(file.getPath()) + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); + } + } else { + for (int i = 0; i < 3; i++) { + TFileRangeDesc file = fileRangeDescs.get(i); + output.append(prefix).append(" ").append(file.getPath()) + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); + } + int other = size - 4; + output.append(prefix).append(" ... other ").append(other).append(" files ...\n"); + TFileRangeDesc file = fileRangeDescs.get(size - 1); + output.append(prefix).append(" ").append(file.getPath()) + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); + } + } + } + + output.append(prefix); + if (cardinality > 0) { + output.append(String.format("cardinality=%s, ", cardinality)); + } + if (avgRowSize > 0) { + output.append(String.format("avgRowSize=%s, ", avgRowSize)); + } + output.append(String.format("numNodes=%s", numNodes)).append("\n"); + + return output.toString(); + } + + protected void createScanRangeLocations(ParamCreateContext context, FileScanProviderIf scanProvider) + throws UserException { + scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java index d85b0eb073..d20f6cd861 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java @@ -22,7 +22,6 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TScanRangeLocations; @@ -42,10 +41,10 @@ public interface FileScanProviderIf { List getPathPartitionKeys() throws DdlException, MetaNotFoundException; - ParamCreateContext createContext(Analyzer analyzer) throws UserException; + FileScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException; - void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException; + void createScanRangeLocations(FileScanNode.ParamCreateContext context, FederationBackendPolicy backendPolicy, + List scanRangeLocations) throws UserException; int getInputSplitNum(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 5b0baf93ed..ca776be5f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -30,7 +30,6 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.ColumnRange; -import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileScanRangeParams; @@ -153,8 +152,8 @@ public class HiveScanProvider extends HMSTableScanProvider { } @Override - public ParamCreateContext createContext(Analyzer analyzer) throws UserException { - ParamCreateContext context = new ParamCreateContext(); + public FileScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException { + FileScanNode.ParamCreateContext context = new FileScanNode.ParamCreateContext(); context.params = new TFileScanRangeParams(); context.destTupleDescriptor = desc; context.params.setDestTupleId(desc.getId().asInt()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index 17051061a2..8a5fa8a16b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -35,7 +35,6 @@ import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.Load; import org.apache.doris.load.loadv2.LoadTask; -import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileAttributes; @@ -85,8 +84,8 @@ public class LoadScanProvider implements FileScanProviderIf { } @Override - public ParamCreateContext createContext(Analyzer analyzer) throws UserException { - ParamCreateContext ctx = new ParamCreateContext(); + public FileScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException { + FileScanNode.ParamCreateContext ctx = new FileScanNode.ParamCreateContext(); ctx.destTupleDescriptor = destTupleDesc; ctx.fileGroup = fileGroupInfo.getFileGroup(); ctx.timezone = analyzer.getTimezone(); @@ -138,8 +137,9 @@ public class LoadScanProvider implements FileScanProviderIf { } @Override - public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException { + public void createScanRangeLocations(FileScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { Preconditions.checkNotNull(fileGroupInfo); fileGroupInfo.getFileStatusAndCalcInstance(backendPolicy); fileGroupInfo.createScanRangeLocations(context, backendPolicy, scanRangeLocations); @@ -169,7 +169,7 @@ public class LoadScanProvider implements FileScanProviderIf { * @param context * @throws UserException */ - private void initColumns(ParamCreateContext context, Analyzer analyzer) throws UserException { + private void initColumns(FileScanNode.ParamCreateContext context, Analyzer analyzer) throws UserException { context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); context.srcSlotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index 0fdfe0943f..be7e5bb43e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -26,7 +26,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.planner.Split; import org.apache.doris.planner.Splitter; -import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.planner.external.iceberg.IcebergScanProvider; import org.apache.doris.planner.external.iceberg.IcebergSplit; import org.apache.doris.system.Backend; @@ -59,8 +58,9 @@ public abstract class QueryScanProvider implements FileScanProviderIf { public abstract TFileAttributes getFileAttributes() throws UserException; @Override - public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException { + public void createScanRangeLocations(FileScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { long start = System.currentTimeMillis(); List inputSplits = splitter.getSplits(context.conjuncts); this.inputSplitNum = inputSplits.size(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java index 48365a7656..cc045f00ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java @@ -27,7 +27,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.load.BrokerFileGroup; -import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; @@ -86,8 +85,8 @@ public class TVFScanProvider extends QueryScanProvider { } @Override - public ParamCreateContext createContext(Analyzer analyzer) throws UserException { - ParamCreateContext context = new ParamCreateContext(); + public FileScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException { + FileScanNode.ParamCreateContext context = new FileScanNode.ParamCreateContext(); context.params = new TFileScanRangeParams(); context.destTupleDescriptor = desc; context.params.setDestTupleId(desc.getId().asInt()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java index 35b45282c5..07ad1fa208 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java @@ -28,7 +28,7 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.ColumnRange; -import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; @@ -81,8 +81,8 @@ public class IcebergApiSource implements IcebergSource { } @Override - public ExternalFileScanNode.ParamCreateContext createContext() throws UserException { - ExternalFileScanNode.ParamCreateContext context = new ExternalFileScanNode.ParamCreateContext(); + public FileQueryScanNode.ParamCreateContext createContext() throws UserException { + FileQueryScanNode.ParamCreateContext context = new FileQueryScanNode.ParamCreateContext(); context.params = new TFileScanRangeParams(); context.destTupleDescriptor = desc; context.params.setDestTupleId(desc.getId().asInt()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java index 747d7fd6f6..0d99ab49c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java @@ -26,7 +26,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.planner.ColumnRange; -import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.HiveScanProvider; import org.apache.doris.thrift.TFileAttributes; @@ -64,7 +64,7 @@ public class IcebergHMSSource implements IcebergSource { } @Override - public ExternalFileScanNode.ParamCreateContext createContext() throws UserException { + public FileQueryScanNode.ParamCreateContext createContext() throws UserException { return hiveScanProvider.createContext(null); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java index eb565638e3..3cb997a7f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java @@ -23,7 +23,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.IcebergSplitter; import org.apache.doris.planner.external.QueryScanProvider; import org.apache.doris.thrift.TFileAttributes; @@ -146,7 +146,7 @@ public class IcebergScanProvider extends QueryScanProvider { } @Override - public ExternalFileScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException { + public FileQueryScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException { return icebergSource.createContext(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java index ab17c6a448..e21cfaa48a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java @@ -23,7 +23,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.thrift.TFileAttributes; public interface IcebergSource { @@ -32,7 +32,7 @@ public interface IcebergSource { org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException; - ExternalFileScanNode.ParamCreateContext createContext() throws UserException; + FileQueryScanNode.ParamCreateContext createContext() throws UserException; TableIf getTargetTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index dc13f74829..ee4c0135b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -419,8 +419,6 @@ public class Coordinator { this.queryOptions.setQueryType(type); } - public void setExecVecEngine(boolean vec) {} - public void setExecPipEngine(boolean vec) { this.queryOptions.setEnablePipelineEngine(vec); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 920bf34d7e..6f56ede3d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -31,7 +31,7 @@ import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; -import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest; import org.apache.doris.proto.Types.PScalarType; @@ -310,7 +310,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio @Override public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { - return new ExternalFileScanNode(id, desc, false); + return new FileQueryScanNode(id, desc, false); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java index c82e1b0844..58a7f7a013 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.qe.ConnectContext; @@ -114,7 +113,6 @@ public class InsertArrayStmtTest { @Test public void testTransactionalInsert() throws Exception { - Config.enable_new_load_scan_node = true; ExceptionChecker.expectThrowsNoException( () -> createTable("CREATE TABLE test.`txn_insert_tbl` (\n" + " `k1` int(11) NULL,\n" diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 1807ef1dd5..4bfd475946 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -151,9 +151,6 @@ struct TQueryOptions { // if the right table is greater than this value in the hash join, we will ignore IN filter 34: optional i32 runtime_filter_max_in_num = 1024; -// whether enable vectorized engine - 41: optional bool enable_vectorized_engine = true - // the resource limitation of this query 42: optional TResourceLimit resource_limit