diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 1c5c4b4944..adcb109ace 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -473,4 +473,14 @@ public abstract class ExternalCatalog implements CatalogIf, Wr } return specifiedDatabaseMap; } + + public boolean useSelfSplitter() { + Map properties = catalogProperty.getProperties(); + boolean ret = true; + if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER) + && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false")) { + ret = false; + } + return ret; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 2df1ed1225..435272782c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -26,10 +26,14 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.RemoteFiles; import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricLabel; @@ -37,9 +41,8 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; -import org.apache.doris.planner.Split; import org.apache.doris.planner.external.FileSplit; -import org.apache.doris.planner.external.HiveSplitter; +import org.apache.doris.spi.Split; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -263,6 +266,19 @@ public class HiveMetaStoreCache { return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values); } + // Get File Status by using FileSystem API. + private FileCacheValue getFileCache(String location, InputFormat inputFormat, + JobConf jobConf, + List partitionValues) throws UserException { + FileCacheValue result = new FileCacheValue(); + result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf)); + RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf); + RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); + locatedFiles.locations().forEach(result::addFile); + result.setPartitionValues(partitionValues); + return result; + } + private FileCacheValue loadFiles(FileCacheKey key) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { @@ -284,8 +300,7 @@ public class HiveMetaStoreCache { InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = HiveSplitter.getFileCache(finalLocation, inputFormat, - jobConf, key.getPartitionValues()); + result = getFileCache(finalLocation, inputFormat, jobConf, key.getPartitionValues()); } else { InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index f27abc0f69..acb3ebe87b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -44,6 +44,8 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.catalog.external.IcebergExternalTable; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; @@ -139,7 +141,9 @@ import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.SortNode; import org.apache.doris.planner.TableFunctionNode; import org.apache.doris.planner.UnionNode; -import org.apache.doris.planner.external.FileQueryScanNode; +import org.apache.doris.planner.external.HiveScanNode; +import org.apache.doris.planner.external.HudiScanNode; +import org.apache.doris.planner.external.iceberg.IcebergScanNode; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPushAggOp; @@ -600,24 +604,44 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor runtimeFilterGenerator.getTargetOnScanNode(fileScan.getId()).forEach( - expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, fileScanNode, context) + expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context) ) ); - Utils.execWithUncheckedException(fileScanNode::finalizeForNereids); + Utils.execWithUncheckedException(scanNode::finalizeForNereids); // Create PlanFragment DataPartition dataPartition = DataPartition.RANDOM; - PlanFragment planFragment = createPlanFragment(fileScanNode, dataPartition, fileScan); + PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); context.addPlanFragment(planFragment); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan); return planFragment; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java index 07d1ff445f..9215d409ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java @@ -42,7 +42,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.external.FileGroupInfo; import org.apache.doris.planner.external.FileScanNode; -import org.apache.doris.planner.external.FileScanProviderIf; import org.apache.doris.planner.external.LoadScanProvider; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.statistics.StatisticalType; @@ -82,7 +81,7 @@ public class FileLoadScanNode extends FileScanNode { // Each DataDescription in a load stmt conreponding to a FileGroupInfo in this list. private final List fileGroupInfos = Lists.newArrayList(); // For load, the num of providers equals to the num of file group infos. - private final List scanProviders = Lists.newArrayList(); + private final List scanProviders = Lists.newArrayList(); // For load, the num of ParamCreateContext equals to the num of file group infos. private final List contexts = Lists.newArrayList(); @@ -128,13 +127,13 @@ public class FileLoadScanNode extends FileScanNode { // For each scan provider, create a corresponding ParamCreateContext private void initParamCreateContexts(Analyzer analyzer) throws UserException { - for (FileScanProviderIf scanProvider : scanProviders) { + for (LoadScanProvider scanProvider : scanProviders) { ParamCreateContext context = scanProvider.createContext(analyzer); // set where and preceding filter. // FIXME(cmy): we should support set different expr for different file group. initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer); initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer); - setDefaultValueExprs(scanProvider, context.srcSlotDescByName, context.params, true); + setDefaultValueExprs(scanProvider.getTargetTable(), context.srcSlotDescByName, context.params, true); this.contexts.add(context); } } @@ -196,7 +195,7 @@ public class FileLoadScanNode extends FileScanNode { contexts.size() + " vs. " + scanProviders.size()); for (int i = 0; i < contexts.size(); ++i) { FileLoadScanNode.ParamCreateContext context = contexts.get(i); - FileScanProviderIf scanProvider = scanProviders.get(i); + LoadScanProvider scanProvider = scanProviders.get(i); finalizeParamsForLoad(context, analyzer); createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java deleted file mode 100644 index f2d06a3aef..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner; - -import org.apache.doris.analysis.Expr; -import org.apache.doris.common.UserException; - -import java.util.List; - -public class OlapSplitter implements Splitter { - - @Override - public List getSplits(List exprs) throws UserException { - return null; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 13dd8638ba..6c4997ea77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -37,8 +37,10 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRangeLocations; @@ -104,6 +106,10 @@ public abstract class ScanNode extends PlanNode { sortColumn = column; } + protected List getSplits() throws UserException { + throw new NotImplementedException("Scan node sub class need to implement getSplits interface."); + } + /** * cast expr to SlotDescriptor type */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 55df5b6f54..92e2b94148 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -60,6 +60,8 @@ import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; @@ -67,6 +69,9 @@ import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.planner.external.FileQueryScanNode; +import org.apache.doris.planner.external.HiveScanNode; +import org.apache.doris.planner.external.HudiScanNode; +import org.apache.doris.planner.external.iceberg.IcebergScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.thrift.TNullSide; @@ -1986,8 +1991,7 @@ public class SingleNodePlanner { case HIVE: throw new RuntimeException("Hive external table is not supported, try to use hive catalog please"); case ICEBERG: - scanNode = new FileQueryScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); - break; + throw new RuntimeException("Iceberg external table is not supported, use iceberg catalog please"); case HUDI: throw new UserException( "Hudi table is no longer supported. Use Multi Catalog feature to connect to Hudi"); @@ -1998,8 +2002,23 @@ public class SingleNodePlanner { scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId()); break; case HMS_EXTERNAL_TABLE: + TableIf table = tblRef.getDesc().getTable(); + switch (((HMSExternalTable) table).getDlaType()) { + case HUDI: + scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + break; + case ICEBERG: + scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + break; + case HIVE: + scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + break; + default: + throw new UserException("Not supported table type" + table.getType()); + } + break; case ICEBERG_EXTERNAL_TABLE: - scanNode = new FileQueryScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; case ES_EXTERNAL_TABLE: scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode", true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java deleted file mode 100644 index 5ad1034e61..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java +++ /dev/null @@ -1,29 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner; - -import org.apache.doris.analysis.Expr; -import org.apache.doris.common.UserException; - -import java.util.List; - -public interface Splitter { - static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB - - List getSplits(List exprs) throws UserException; -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 48f35eb363..d5927a1520 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -22,24 +22,38 @@ import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.FunctionGenTable; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FsBroker; +import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.catalog.external.IcebergExternalTable; +import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; -import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.planner.external.iceberg.IcebergApiSource; -import org.apache.doris.planner.external.iceberg.IcebergHMSSource; -import org.apache.doris.planner.external.iceberg.IcebergScanProvider; -import org.apache.doris.planner.external.iceberg.IcebergSource; +import org.apache.doris.planner.external.iceberg.IcebergScanNode; +import org.apache.doris.planner.external.iceberg.IcebergSplit; +import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; -import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -48,20 +62,21 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** * FileQueryScanNode for querying the file access type of catalog, now only support - * hive,hudi and iceberg. + * hive,hudi, iceberg and TVF. */ -public class FileQueryScanNode extends FileScanNode { +public abstract class FileQueryScanNode extends FileScanNode { private static final Logger LOG = LogManager.getLogger(FileQueryScanNode.class); - // For query, there is only one FileScanProvider. - private FileScanProviderIf scanProvider; + protected Map destSlotDescByName; + protected TFileScanRangeParams params; - private Map destSlotDescByName; - private TFileScanRangeParams params; + protected int inputSplitNum = 0; + protected long inputFileSize = 0; /** * External file scan node for Query hms table @@ -69,8 +84,9 @@ public class FileQueryScanNode extends FileScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "FILE_QUERY_SCAN_NODE", StatisticalType.FILE_SCAN_NODE, needCheckColumnPriv); + public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, + StatisticalType statisticalType, boolean needCheckColumnPriv) { + super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); } @Override @@ -87,36 +103,27 @@ public class FileQueryScanNode extends FileScanNode { } // Init scan provider and schema related params. - private void doInitialize() throws UserException { + protected void doInitialize() throws UserException { Preconditions.checkNotNull(desc); + ExternalTable table = (ExternalTable) desc.getTable(); + if (table.isView()) { + throw new AnalysisException( + String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName())); + } computeColumnFilter(); - initScanProvider(); initBackendPolicy(); initSchemaParams(); } - private void initScanProvider() throws UserException { - if (this.desc.getTable() instanceof HMSExternalTable) { - HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); - initHMSTableScanProvider(hmsTable); - } else if (this.desc.getTable() instanceof FunctionGenTable) { - FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); - initTVFScanProvider(table, (ExternalFileTableValuedFunction) table.getTvf()); - } else if (this.desc.getTable() instanceof IcebergExternalTable) { - IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); - initIcebergScanProvider(table); - } - } - // Init schema (Tuple/Slot) related params. - private void initSchemaParams() throws UserException { + protected void initSchemaParams() throws UserException { destSlotDescByName = Maps.newHashMap(); for (SlotDescriptor slot : desc.getSlots()) { destSlotDescByName.put(slot.getColumn().getName(), slot); } params = new TFileScanRangeParams(); params.setDestTupleId(desc.getId().asInt()); - List partitionKeys = scanProvider.getPathPartitionKeys(); + List partitionKeys = getPathPartitionKeys(); List columns = desc.getTable().getBaseSchema(false); params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); for (SlotDescriptor slot : desc.getSlots()) { @@ -128,20 +135,13 @@ public class FileQueryScanNode extends FileScanNode { slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); params.addToRequiredSlots(slotInfo); } - setDefaultValueExprs(scanProvider, destSlotDescByName, params, false); + setDefaultValueExprs(getTargetTable(), destSlotDescByName, params, false); setColumnPositionMappingForTextFile(); // For query, set src tuple id to -1. params.setSrcTupleId(-1); - TableIf table = desc.getTable(); - // Slot to schema id map is used for supporting hive 1.x orc internal column name (col0, col1, col2...) - if (table instanceof HMSExternalTable) { - if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { - genSlotToSchemaIdMap(); - } - } } - private void initBackendPolicy() throws UserException { + protected void initBackendPolicy() throws UserException { backendPolicy.init(); numNodes = backendPolicy.numBackends(); } @@ -161,74 +161,11 @@ public class FileQueryScanNode extends FileScanNode { TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); slotInfo.setSlotId(slot.getId().asInt()); - slotInfo.setIsFileSlot(!scanProvider.getPathPartitionKeys().contains(slot.getColumn().getName())); + slotInfo.setIsFileSlot(!getPathPartitionKeys().contains(slot.getColumn().getName())); params.addToRequiredSlots(slotInfo); } } - private void initHMSTableScanProvider(HMSExternalTable hmsTable) throws UserException { - Preconditions.checkNotNull(hmsTable); - - if (hmsTable.isView()) { - throw new AnalysisException( - String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(), - hmsTable.getDbName(), hmsTable.getName())); - } - - switch (hmsTable.getDlaType()) { - case HUDI: - scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange); - break; - case ICEBERG: - IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc, columnNameToRange); - scanProvider = new IcebergScanProvider(hmsSource, analyzer); - break; - case HIVE: - String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); - if (inputFormat.contains("TextInputFormat")) { - for (SlotDescriptor slot : desc.getSlots()) { - if (!slot.getType().isScalarType()) { - throw new UserException("For column `" + slot.getColumn().getName() - + "`, The column types ARRAY/MAP/STRUCT are not supported yet" - + " for text input format of Hive. "); - } - } - } - scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange); - break; - default: - throw new UserException("Unknown table type: " + hmsTable.getDlaType()); - } - } - - private void initIcebergScanProvider(IcebergExternalTable icebergTable) throws UserException { - Preconditions.checkNotNull(icebergTable); - if (icebergTable.isView()) { - throw new AnalysisException( - String.format("Querying external view '%s.%s' is not supported", icebergTable.getDbName(), - icebergTable.getName())); - } - - String catalogType = icebergTable.getIcebergCatalogType(); - switch (catalogType) { - case IcebergExternalCatalog.ICEBERG_HMS: - case IcebergExternalCatalog.ICEBERG_REST: - case IcebergExternalCatalog.ICEBERG_DLF: - case IcebergExternalCatalog.ICEBERG_GLUE: - IcebergSource icebergSource = new IcebergApiSource( - icebergTable, desc, columnNameToRange); - scanProvider = new IcebergScanProvider(icebergSource, analyzer); - break; - default: - throw new UserException("Unknown iceberg catalog type: " + catalogType); - } - } - - private void initTVFScanProvider(FunctionGenTable table, ExternalFileTableValuedFunction tvf) { - Preconditions.checkNotNull(table); - scanProvider = new TVFScanProvider(table, desc, tvf); - } - @Override public void finalize(Analyzer analyzer) throws UserException { doFinalize(); @@ -240,19 +177,13 @@ public class FileQueryScanNode extends FileScanNode { } // Create scan range locations and the statistics. - private void doFinalize() throws UserException { - createScanRangeLocations(conjuncts, params, scanProvider); - this.inputSplitsNum += scanProvider.getInputSplitNum(); - this.totalFileSize += scanProvider.getInputFileSize(); - if (scanProvider instanceof HiveScanProvider) { - this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); - this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); - } + protected void doFinalize() throws UserException { + createScanRangeLocations(); } private void setColumnPositionMappingForTextFile() throws UserException { - TableIf tbl = scanProvider.getTargetTable(); + TableIf tbl = getTargetTable(); List columnIdxs = Lists.newArrayList(); for (TFileScanSlotInfo slot : params.getRequiredSlots()) { @@ -270,20 +201,161 @@ public class FileQueryScanNode extends FileScanNode { params.setColumnIdxs(columnIdxs); } - // To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...) - private void genSlotToSchemaIdMap() { - List baseSchema = desc.getTable().getBaseSchema(); - Map columnNameToPosition = Maps.newHashMap(); - for (SlotDescriptor slot : desc.getSlots()) { - int idx = 0; - for (Column col : baseSchema) { - if (col.getName().equals(slot.getColumn().getName())) { - columnNameToPosition.put(col.getName(), idx); - break; + public void createScanRangeLocations() throws UserException { + long start = System.currentTimeMillis(); + List inputSplits = getSplits(); + this.inputSplitNum = inputSplits.size(); + if (inputSplits.isEmpty()) { + return; + } + FileSplit inputSplit = (FileSplit) inputSplits.get(0); + TFileType locationType = getLocationType(); + params.setFileType(locationType); + TFileFormatType fileFormatType = getFileFormatType(); + params.setFormatType(getFileFormatType()); + if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) { + params.setFileAttributes(getFileAttributes()); + } + + // set hdfs params for hdfs file type. + Map locationProperties = getLocationProperties(); + if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { + String fsName = getFsName(inputSplit); + THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); + tHdfsParams.setFsName(fsName); + params.setHdfsParams(tHdfsParams); + + if (locationType == TFileType.FILE_BROKER) { + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + if (broker == null) { + throw new UserException("No alive broker."); } - idx += 1; + params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + } + } else if (locationType == TFileType.FILE_S3) { + params.setProperties(locationProperties); + } + + List pathPartitionKeys = getPathPartitionKeys(); + for (Split split : inputSplits) { + TScanRangeLocations curLocations = newLocations(params, backendPolicy); + FileSplit fileSplit = (FileSplit) split; + + // If fileSplit has partition values, use the values collected from hive partitions. + // Otherwise, use the values in file path. + List partitionValuesFromPath = fileSplit.getPartitionValues() == null + ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false) + : fileSplit.getPartitionValues(); + + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); + // external data lake table + if (fileSplit instanceof IcebergSplit) { + IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit); + } + + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", + curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), + fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); + scanRangeLocations.add(curLocations); + this.inputFileSize += fileSplit.getLength(); + } + LOG.debug("create #{} ScanRangeLocations cost: {} ms", + scanRangeLocations.size(), (System.currentTimeMillis() - start)); + } + + private TScanRangeLocations newLocations(TFileScanRangeParams params, FederationBackendPolicy backendPolicy) { + // Generate on file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + fileScanRange.setParams(params); + + // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setExtScanRange(externalScanRange); + + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + + TScanRangeLocation location = new TScanRangeLocation(); + Backend selectedBackend = backendPolicy.getNextBe(); + location.setBackendId(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getIp(), selectedBackend.getBePort())); + locations.addToLocations(location); + + return locations; + } + + private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List columnsFromPath, + List columnsFromPathKeys) + throws UserException { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setStartOffset(fileSplit.getStart()); + rangeDesc.setSize(fileSplit.getLength()); + // fileSize only be used when format is orc or parquet and TFileType is broker + // When TFileType is other type, it is not necessary + rangeDesc.setFileSize(fileSplit.getFileLength()); + rangeDesc.setColumnsFromPath(columnsFromPath); + rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); + + if (getLocationType() == TFileType.FILE_HDFS) { + rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); + } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() == TFileType.FILE_BROKER) { + // need full path + rangeDesc.setPath(fileSplit.getPath().toString()); + } + return rangeDesc; + } + + protected TFileType getLocationType() throws UserException { + throw new NotImplementedException(""); + } + + protected TFileFormatType getFileFormatType() throws UserException { + throw new NotImplementedException(""); + } + + protected TFileAttributes getFileAttributes() throws UserException { + throw new NotImplementedException(""); + } + + protected List getPathPartitionKeys() throws UserException { + throw new NotImplementedException(""); + } + + protected TableIf getTargetTable() throws UserException { + throw new NotImplementedException(""); + } + + protected Map getLocationProperties() throws UserException { + throw new NotImplementedException(""); + } + + // eg: hdfs://namenode s3://buckets + protected String getFsName(FileSplit split) { + String fullPath = split.getPath().toUri().toString(); + String filePath = split.getPath().toUri().getPath(); + return fullPath.replace(filePath, ""); + } + + protected static Optional getTFileType(String location) { + if (location != null && !location.isEmpty()) { + if (FeConstants.isObjStorage(location)) { + return Optional.of(TFileType.FILE_S3); + } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) { + return Optional.of(TFileType.FILE_HDFS); + } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) { + return Optional.of(TFileType.FILE_LOCAL); + } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) { + return Optional.of(TFileType.FILE_BROKER); + } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) { + return Optional.of(TFileType.FILE_BROKER); + } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { + return Optional.of(TFileType.FILE_BROKER); } } - params.setSlotNameToSchemaPos(columnNameToPosition); + return Optional.empty(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java index 364844ef36..3a24ba3319 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -27,6 +27,8 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.planner.FileLoadScanNode; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; @@ -41,9 +43,12 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -55,6 +60,8 @@ import java.util.Map; public class FileScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(FileScanNode.class); + public static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB + // For explain protected long inputSplitsNum = 0; protected long totalFileSize = 0; @@ -152,24 +159,17 @@ public class FileScanNode extends ExternalScanNode { return output.toString(); } - // TODO: Keep 2 versions of createScanRangeLocations, will fix this while refactor split and assignment code. + // TODO: This api is for load job only. Will remove it later. protected void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, - FileScanProviderIf scanProvider) + LoadScanProvider scanProvider) throws UserException { scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations); } - protected void createScanRangeLocations(List conjuncts, TFileScanRangeParams params, - FileScanProviderIf scanProvider) - throws UserException { - scanProvider.createScanRangeLocations(conjuncts, params, backendPolicy, scanRangeLocations); - } - - protected void setDefaultValueExprs(FileScanProviderIf scanProvider, + protected void setDefaultValueExprs(TableIf tbl, Map slotDescByName, TFileScanRangeParams params, boolean useVarcharAsNull) throws UserException { - TableIf tbl = scanProvider.getTargetTable(); Preconditions.checkNotNull(tbl); TExpr tExpr = new TExpr(); tExpr.setNodes(Lists.newArrayList()); @@ -210,4 +210,54 @@ public class FileScanNode extends ExternalScanNode { } } } + + protected List splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, + boolean splittable, List partitionValues) throws IOException { + if (blockLocations == null) { + blockLocations = new BlockLocation[0]; + } + long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + if (splitSize <= 0) { + splitSize = blockSize; + } + // Min split size is DEFAULT_SPLIT_SIZE(128MB). + splitSize = Math.max(splitSize, DEFAULT_SPLIT_SIZE); + List result = Lists.newArrayList(); + if (!splittable) { + LOG.debug("Path {} is not splittable.", path); + String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); + result.add(new FileSplit(path, 0, length, length, hosts, partitionValues)); + return result; + } + long bytesRemaining; + for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; + bytesRemaining -= splitSize) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(new FileSplit(path, length - bytesRemaining, splitSize, length, hosts, partitionValues)); + } + if (bytesRemaining != 0L) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, length, hosts, partitionValues)); + } + + LOG.debug("Path {} includes {} splits.", path, result.size()); + return result; + } + + private int getBlockIndex(BlockLocation[] blkLocations, long offset) { + if (blkLocations == null || blkLocations.length == 0) { + return -1; + } + for (int i = 0; i < blkLocations.length; ++i) { + if (blkLocations[i].getOffset() <= offset + && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1L; + throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java deleted file mode 100644 index 367e5d4e72..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java +++ /dev/null @@ -1,61 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.planner.FileLoadScanNode; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileScanRangeParams; -import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.TScanRangeLocations; - -import java.util.List; -import java.util.Map; - -public interface FileScanProviderIf { - // Return parquet/orc/text, etc. - TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException; - - // Return S3/HDSF, etc. - TFileType getLocationType() throws DdlException, MetaNotFoundException; - - // return properties for S3/HDFS, etc. - Map getLocationProperties() throws MetaNotFoundException, DdlException; - - List getPathPartitionKeys() throws DdlException, MetaNotFoundException; - - FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException; - - void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException; - - void createScanRangeLocations(List conjuncts, TFileScanRangeParams params, - FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException; - - int getInputSplitNum(); - - long getInputFileSize(); - - TableIf getTargetTable(); -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java index bd0fa97cc4..03f1ab3b60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java @@ -17,7 +17,7 @@ package org.apache.doris.planner.external; -import org.apache.doris.planner.Split; +import org.apache.doris.spi.Split; import lombok.Data; import org.apache.hadoop.fs.Path; @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path; import java.util.List; @Data -public class FileSplit extends Split { +public class FileSplit implements Split { protected Path path; protected long start; // length of this split, in bytes @@ -34,6 +34,7 @@ public class FileSplit extends Split { // -1 means unset. // If the file length is not set, the file length will be fetched from the file system. protected long fileLength; + protected String[] hosts; protected TableFormatType tableFormatType; // The values of partitions. // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile @@ -46,15 +47,16 @@ public class FileSplit extends Split { this.start = start; this.length = length; this.fileLength = fileLength; - this.hosts = hosts; + this.hosts = hosts == null ? new String[0] : hosts; this.partitionValues = partitionValues; } public String[] getHosts() { - if (this.hosts == null) { - return new String[]{}; - } else { - return this.hosts; - } + return hosts; + } + + @Override + public Object getInfo() { + return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java deleted file mode 100644 index 283fc90f30..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; - -import org.apache.hadoop.hive.metastore.api.Table; - -import java.util.Map; - -public abstract class HMSTableScanProvider extends QueryScanProvider { - - public abstract String getMetaStoreUrl(); - - public abstract Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; - - public abstract Map getTableProperties() throws MetaNotFoundException; -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java similarity index 51% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 9d8093e982..96c1872010 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -17,32 +17,34 @@ package org.apache.doris.planner.external; -import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hive.HivePartition; -import org.apache.doris.external.hive.util.HiveUtil; -import org.apache.doris.fs.FileSystemFactory; -import org.apache.doris.fs.RemoteFiles; -import org.apache.doris.fs.remote.RemoteFileSystem; -import org.apache.doris.planner.ColumnRange; import org.apache.doris.planner.ListPartitionPrunerV2; -import org.apache.doris.planner.Split; -import org.apache.doris.planner.Splitter; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.spi.Split; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TFileType; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; +import com.google.common.collect.Maps; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,23 +52,52 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; -public class HiveSplitter implements Splitter { +public class HiveScanNode extends FileQueryScanNode { + private static final Logger LOG = LogManager.getLogger(HiveScanNode.class); - private static final Logger LOG = LogManager.getLogger(HiveSplitter.class); + public static final String PROP_FIELD_DELIMITER = "field.delim"; + public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" + public static final String DEFAULT_LINE_DELIMITER = "\n"; - private HMSExternalTable hmsTable; - private Map columnNameToRange; - private int totalPartitionNum = 0; - private int readPartitionNum = 0; + private final HMSExternalTable hmsTable; - public HiveSplitter(HMSExternalTable hmsTable, Map columnNameToRange) { - this.hmsTable = hmsTable; - this.columnNameToRange = columnNameToRange; + /** + * * External file scan node for Query Hive table + * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv + * eg: s3 tvf + * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + */ + public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv); + hmsTable = (HMSExternalTable) desc.getTable(); + } + + public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, + StatisticalType statisticalType, boolean needCheckColumnPriv) { + super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + hmsTable = (HMSExternalTable) desc.getTable(); } @Override - public List getSplits(List exprs) throws UserException { + protected void doInitialize() throws UserException { + super.doInitialize(); + genSlotToSchemaIdMap(); + String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); + if (inputFormat.contains("TextInputFormat")) { + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.getType().isScalarType()) { + throw new UserException("For column `" + slot.getColumn().getName() + + "`, The column types ARRAY/MAP/STRUCT are not supported yet" + + " for text input format of Hive. "); + } + } + } + } + + @Override + protected List getSplits() throws UserException { long start = System.currentTimeMillis(); try { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() @@ -78,14 +109,7 @@ public class HiveSplitter implements Splitter { hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), partitionColumnTypes); } - Map properties = hmsTable.getCatalog().getCatalogProperty().getProperties(); - boolean useSelfSplitter = true; - if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER) - && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false")) { - LOG.debug("Using self splitter for hmsTable {}", hmsTable.getName()); - useSelfSplitter = false; - } - + boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter(); List allFiles = Lists.newArrayList(); if (hivePartitionValues != null) { // 2. prune partitions by expr @@ -135,85 +159,88 @@ public class HiveSplitter implements Splitter { private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, List allFiles, boolean useSelfSplitter) throws IOException { + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : cache.getFilesByPartitions(partitions, useSelfSplitter)) { + // This if branch is to support old splitter, will remove later. if (fileCacheValue.getSplits() != null) { allFiles.addAll(fileCacheValue.getSplits()); } if (fileCacheValue.getFiles() != null) { boolean isSplittable = fileCacheValue.isSplittable(); for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { - allFiles.addAll(splitFile(status, isSplittable, fileCacheValue.getPartitionValues())); + allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(), + status.getBlockLocations(), status.getLength(), + isSplittable, fileCacheValue.getPartitionValues())); } } } } - private List splitFile(HiveMetaStoreCache.HiveFileStatus status, - boolean splittable, List partitionValues) throws IOException { - List result = Lists.newArrayList(); - if (!splittable) { - LOG.debug("Path {} is not splittable.", status.getPath()); - BlockLocation block = status.getBlockLocations()[0]; - result.add(new FileSplit(status.getPath(), 0, status.getLength(), - status.getLength(), block.getHosts(), partitionValues)); - return result; - } - long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); - if (splitSize <= 0) { - splitSize = status.getBlockSize(); - } - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE; - BlockLocation[] blockLocations = status.getBlockLocations(); - long length = status.getLength(); - long bytesRemaining; - for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; - bytesRemaining -= splitSize) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - result.add(new FileSplit(status.getPath(), length - bytesRemaining, - splitSize, length, blockLocations[location].getHosts(), partitionValues)); - } - if (bytesRemaining != 0L) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - result.add(new FileSplit(status.getPath(), length - bytesRemaining, - bytesRemaining, length, blockLocations[location].getHosts(), partitionValues)); - } - - LOG.debug("Path {} includes {} splits.", status.getPath(), result.size()); - return result; + @Override + public List getPathPartitionKeys() { + return hmsTable.getRemoteTable().getPartitionKeys() + .stream().map(FieldSchema::getName).collect(Collectors.toList()); } - public int getTotalPartitionNum() { - return totalPartitionNum; + @Override + public TableIf getTargetTable() { + return hmsTable; } - public int getReadPartitionNum() { - return readPartitionNum; + @Override + protected TFileType getLocationType() throws UserException { + String location = hmsTable.getRemoteTable().getSd().getLocation(); + return getTFileType(location).orElseThrow(() -> + new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); } - // Get File Status by using FileSystem API. - public static HiveMetaStoreCache.FileCacheValue getFileCache(String location, InputFormat inputFormat, - JobConf jobConf, - List partitionValues) throws UserException { - HiveMetaStoreCache.FileCacheValue result = new HiveMetaStoreCache.FileCacheValue(); - result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf)); - RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf); - RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); - locatedFiles.locations().forEach(result::addFile); - result.setPartitionValues(partitionValues); - return result; + @Override + public TFileFormatType getFileFormatType() throws UserException { + TFileFormatType type = null; + String inputFormatName = hmsTable.getRemoteTable().getSd().getInputFormat(); + String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName); + if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) { + type = TFileFormatType.FORMAT_PARQUET; + } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) { + type = TFileFormatType.FORMAT_ORC; + } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) { + type = TFileFormatType.FORMAT_CSV_PLAIN; + } + return type; } - private static int getBlockIndex(BlockLocation[] blkLocations, long offset) { - for (int i = 0; i < blkLocations.length; ++i) { - if (blkLocations[i].getOffset() <= offset - && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) { - return i; + @Override + protected Map getLocationProperties() throws UserException { + return hmsTable.getCatalogProperties(); + } + + @Override + protected TFileAttributes getFileAttributes() throws UserException { + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() + .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); + textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER); + TFileAttributes fileAttributes = new TFileAttributes(); + fileAttributes.setTextParams(textParams); + fileAttributes.setHeaderType(""); + return fileAttributes; + } + + // To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...) + private void genSlotToSchemaIdMap() { + List baseSchema = desc.getTable().getBaseSchema(); + Map columnNameToPosition = Maps.newHashMap(); + for (SlotDescriptor slot : desc.getSlots()) { + int idx = 0; + for (Column col : baseSchema) { + if (col.getName().equals(slot.getColumn().getName())) { + columnNameToPosition.put(col.getName(), idx); + break; + } + idx += 1; } } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1L; - throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); + params.setSlotNameToSchemaPos(columnNameToPosition); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java deleted file mode 100644 index 4037f97a13..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ /dev/null @@ -1,139 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.HiveMetaStoreClientHelper; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.planner.ColumnRange; -import org.apache.doris.thrift.TFileAttributes; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileTextScanRangeParams; -import org.apache.doris.thrift.TFileType; - -import com.google.common.collect.Maps; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * A HiveScanProvider to get information for scan node. - */ -public class HiveScanProvider extends HMSTableScanProvider { - private static final Logger LOG = LogManager.getLogger(HiveScanProvider.class); - - private static final String PROP_FIELD_DELIMITER = "field.delim"; - private static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" - private static final String DEFAULT_LINE_DELIMITER = "\n"; - - protected HMSExternalTable hmsTable; - protected final TupleDescriptor desc; - protected Map columnNameToRange; - - public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, - Map columnNameToRange) { - this.hmsTable = hmsTable; - this.desc = desc; - this.columnNameToRange = columnNameToRange; - this.splitter = new HiveSplitter(hmsTable, columnNameToRange); - } - - @Override - public TableIf getTargetTable() { - return hmsTable; - } - - @Override - public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { - TFileFormatType type = null; - String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); - String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName); - if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) { - type = TFileFormatType.FORMAT_PARQUET; - } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) { - type = TFileFormatType.FORMAT_ORC; - } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) { - type = TFileFormatType.FORMAT_CSV_PLAIN; - } - return type; - } - - @Override - public TFileType getLocationType() throws DdlException, MetaNotFoundException { - String location = hmsTable.getRemoteTable().getSd().getLocation(); - return getTFileType(location).orElseThrow(() -> - new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); - } - - @Override - public String getMetaStoreUrl() { - return hmsTable.getMetastoreUri(); - } - - public int getTotalPartitionNum() { - return ((HiveSplitter) splitter).getTotalPartitionNum(); - } - - public int getReadPartitionNum() { - return ((HiveSplitter) splitter).getReadPartitionNum(); - } - - @Override - public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { - return hmsTable.getRemoteTable(); - } - - @Override - public Map getTableProperties() throws MetaNotFoundException { - // TODO: implement it when we really properties from remote table. - return Maps.newHashMap(); - } - - @Override - public Map getLocationProperties() throws MetaNotFoundException, DdlException { - return hmsTable.getCatalogProperties(); - } - - @Override - public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { - return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); - } - - @Override - public TFileAttributes getFileAttributes() throws UserException { - TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() - .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); - textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER); - TFileAttributes fileAttributes = new TFileAttributes(); - fileAttributes.setTextParams(textParams); - fileAttributes.setHeaderType(""); - return fileAttributes; - } -} - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java similarity index 59% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java index 59274ec521..34ff3b7457 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java @@ -18,34 +18,31 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.planner.ColumnRange; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileFormatType; import java.util.Collections; import java.util.List; -import java.util.Map; -/** - * A file scan provider for hudi. - * HudiProvier is extended with hive since they both use input format interface to get the split. - */ -public class HudiScanProvider extends HiveScanProvider { - - public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, - Map columnNameToRange) { - super(hmsTable, desc, columnNameToRange); +public class HudiScanNode extends HiveScanNode { + /** + * External file scan node for Query Hudi table + * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv + * eg: s3 tvf + * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + */ + public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); } @Override - public TFileFormatType getFileFormatType() throws DdlException { + public TFileFormatType getFileFormatType() { return TFileFormatType.FORMAT_PARQUET; } @Override - public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + public List getPathPartitionKeys() { return Collections.emptyList(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java deleted file mode 100644 index 15f19bf7a5..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java +++ /dev/null @@ -1,155 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.TableSnapshot; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.external.iceberg.util.IcebergUtils; -import org.apache.doris.planner.Split; -import org.apache.doris.planner.Splitter; -import org.apache.doris.planner.external.iceberg.IcebergDeleteFileFilter; -import org.apache.doris.planner.external.iceberg.IcebergScanProvider; -import org.apache.doris.planner.external.iceberg.IcebergSource; -import org.apache.doris.planner.external.iceberg.IcebergSplit; - -import org.apache.hadoop.fs.Path; -import org.apache.iceberg.BaseTable; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileContent; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.HistoryEntry; -import org.apache.iceberg.MetadataColumns; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.exceptions.NotFoundException; -import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.types.Conversions; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -public class IcebergSplitter implements Splitter { - private static final Logger LOG = LogManager.getLogger(IcebergSplitter.class); - - private final IcebergSource icebergSource; - private final Analyzer analyzer; - - public IcebergSplitter(IcebergSource icebergSource, Analyzer analyzer) { - this.icebergSource = icebergSource; - this.analyzer = analyzer; - } - - @Override - public List getSplits(List exprs) throws UserException { - List expressions = new ArrayList<>(); - org.apache.iceberg.Table table = icebergSource.getIcebergTable(); - for (Expr conjunct : exprs) { - Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema()); - if (expression != null) { - expressions.add(expression); - } - } - TableScan scan = table.newScan(); - TableSnapshot tableSnapshot = icebergSource.getDesc().getRef().getTableSnapshot(); - if (tableSnapshot != null) { - TableSnapshot.VersionType type = tableSnapshot.getType(); - try { - if (type == TableSnapshot.VersionType.VERSION) { - scan = scan.useSnapshot(tableSnapshot.getVersion()); - } else { - long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); - scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId)); - } - } catch (IllegalArgumentException e) { - throw new UserException(e); - } - } - for (Expression predicate : expressions) { - scan = scan.filter(predicate); - } - List splits = new ArrayList<>(); - int formatVersion = ((BaseTable) table).operations().current().formatVersion(); - for (FileScanTask task : scan.planFiles()) { - long fileSize = task.file().fileSizeInBytes(); - for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) { - String dataFilePath = splitTask.file().path().toString(); - IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(), - splitTask.length(), fileSize, new String[0]); - split.setFormatVersion(formatVersion); - if (formatVersion >= IcebergScanProvider.MIN_DELETE_FILE_SUPPORT_VERSION) { - split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); - } - split.setTableFormatType(TableFormatType.ICEBERG); - split.setAnalyzer(analyzer); - splits.add(split); - } - } - return splits; - } - - public static long getSnapshotIdAsOfTime(List historyEntries, long asOfTimestamp) { - // find history at or before asOfTimestamp - HistoryEntry latestHistory = null; - for (HistoryEntry entry : historyEntries) { - if (entry.timestampMillis() <= asOfTimestamp) { - if (latestHistory == null) { - latestHistory = entry; - continue; - } - if (entry.timestampMillis() > latestHistory.timestampMillis()) { - latestHistory = entry; - } - } - } - if (latestHistory == null) { - throw new NotFoundException("No version history at or before " - + Instant.ofEpochMilli(asOfTimestamp)); - } - return latestHistory.snapshotId(); - } - - private List getDeleteFileFilters(FileScanTask spitTask) { - List filters = new ArrayList<>(); - for (DeleteFile delete : spitTask.deletes()) { - if (delete.content() == FileContent.POSITION_DELETES) { - ByteBuffer lowerBoundBytes = delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId()); - Optional positionLowerBound = Optional.ofNullable(lowerBoundBytes) - .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); - ByteBuffer upperBoundBytes = delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId()); - Optional positionUpperBound = Optional.ofNullable(upperBoundBytes) - .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); - filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(), - positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L))); - } else if (delete.content() == FileContent.EQUALITY_DELETES) { - // todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(), - // delete.equalityFieldIds())); - throw new IllegalStateException("Don't support equality delete file"); - } else { - throw new IllegalStateException("Unknown delete content: " + delete.content()); - } - } - return filters; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index ea079b644e..89189ca970 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -18,7 +18,6 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.SlotRef; @@ -55,7 +54,7 @@ import com.google.common.collect.Maps; import java.util.List; import java.util.Map; -public class LoadScanProvider implements FileScanProviderIf { +public class LoadScanProvider { private FileGroupInfo fileGroupInfo; private TupleDescriptor destTupleDesc; @@ -65,27 +64,22 @@ public class LoadScanProvider implements FileScanProviderIf { this.destTupleDesc = destTupleDesc; } - @Override public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { return null; } - @Override public TFileType getLocationType() throws DdlException, MetaNotFoundException { return null; } - @Override public Map getLocationProperties() throws MetaNotFoundException, DdlException { return null; } - @Override public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { return null; } - @Override public FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException { FileLoadScanNode.ParamCreateContext ctx = new FileLoadScanNode.ParamCreateContext(); ctx.destTupleDescriptor = destTupleDesc; @@ -138,7 +132,6 @@ public class LoadScanProvider implements FileScanProviderIf { return ""; } - @Override public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException { @@ -147,18 +140,10 @@ public class LoadScanProvider implements FileScanProviderIf { fileGroupInfo.createScanRangeLocations(context, backendPolicy, scanRangeLocations); } - @Override - public void createScanRangeLocations(List conjuncts, TFileScanRangeParams params, - FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException { - } - - @Override public int getInputSplitNum() { return fileGroupInfo.getFileStatuses().size(); } - @Override public long getInputFileSize() { long res = 0; for (TBrokerFileStatus fileStatus : fileGroupInfo.getFileStatuses()) { @@ -250,7 +235,6 @@ public class LoadScanProvider implements FileScanProviderIf { } } - @Override public TableIf getTargetTable() { return fileGroupInfo.getTargetTable(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java deleted file mode 100644 index ebd1831432..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ /dev/null @@ -1,225 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.FsBroker; -import org.apache.doris.catalog.HdfsResource; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.BrokerUtil; -import org.apache.doris.planner.FileLoadScanNode; -import org.apache.doris.planner.Split; -import org.apache.doris.planner.Splitter; -import org.apache.doris.planner.external.iceberg.IcebergScanProvider; -import org.apache.doris.planner.external.iceberg.IcebergSplit; -import org.apache.doris.system.Backend; -import org.apache.doris.thrift.TExternalScanRange; -import org.apache.doris.thrift.TFileAttributes; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileScanRange; -import org.apache.doris.thrift.TFileScanRangeParams; -import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.THdfsParams; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocation; -import org.apache.doris.thrift.TScanRangeLocations; - -import com.google.common.base.Joiner; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.Map; -import java.util.Optional; - -public abstract class QueryScanProvider implements FileScanProviderIf { - public static final Logger LOG = LogManager.getLogger(QueryScanProvider.class); - private int inputSplitNum = 0; - private long inputFileSize = 0; - protected Splitter splitter; - - public abstract TFileAttributes getFileAttributes() throws UserException; - - @Override - public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, - FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException { - } - - @Override - public FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException { - return null; - } - - @Override - public void createScanRangeLocations(List conjuncts, TFileScanRangeParams params, - FederationBackendPolicy backendPolicy, - List scanRangeLocations) throws UserException { - long start = System.currentTimeMillis(); - List inputSplits = splitter.getSplits(conjuncts); - this.inputSplitNum = inputSplits.size(); - if (inputSplits.isEmpty()) { - return; - } - FileSplit inputSplit = (FileSplit) inputSplits.get(0); - TFileType locationType = getLocationType(); - params.setFileType(locationType); - TFileFormatType fileFormatType = getFileFormatType(); - params.setFormatType(getFileFormatType()); - if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) { - params.setFileAttributes(getFileAttributes()); - } - - // set hdfs params for hdfs file type. - Map locationProperties = getLocationProperties(); - if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { - String fsName = ""; - if (this instanceof TVFScanProvider) { - fsName = ((TVFScanProvider) this).getFsName(); - } else { - String fullPath = inputSplit.getPath().toUri().toString(); - String filePath = inputSplit.getPath().toUri().getPath(); - // eg: - // hdfs://namenode - // s3://buckets - fsName = fullPath.replace(filePath, ""); - } - THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); - tHdfsParams.setFsName(fsName); - params.setHdfsParams(tHdfsParams); - - if (locationType == TFileType.FILE_BROKER) { - FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); - if (broker == null) { - throw new UserException("No alive broker."); - } - params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); - } - } else if (locationType == TFileType.FILE_S3) { - params.setProperties(locationProperties); - } - - List pathPartitionKeys = getPathPartitionKeys(); - for (Split split : inputSplits) { - TScanRangeLocations curLocations = newLocations(params, backendPolicy); - FileSplit fileSplit = (FileSplit) split; - - // If fileSplit has partition values, use the values collected from hive partitions. - // Otherwise, use the values in file path. - List partitionValuesFromPath = fileSplit.getPartitionValues() == null - ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false) - : fileSplit.getPartitionValues(); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); - // external data lake table - if (fileSplit instanceof IcebergSplit) { - IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit); - } - - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", - curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), - fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); - scanRangeLocations.add(curLocations); - this.inputFileSize += fileSplit.getLength(); - } - LOG.debug("create #{} ScanRangeLocations cost: {} ms", - scanRangeLocations.size(), (System.currentTimeMillis() - start)); - } - - @Override - public int getInputSplitNum() { - return this.inputSplitNum; - } - - @Override - public long getInputFileSize() { - return this.inputFileSize; - } - - private TScanRangeLocations newLocations(TFileScanRangeParams params, FederationBackendPolicy backendPolicy) { - // Generate on file scan range - TFileScanRange fileScanRange = new TFileScanRange(); - fileScanRange.setParams(params); - - // Scan range - TExternalScanRange externalScanRange = new TExternalScanRange(); - externalScanRange.setFileScanRange(fileScanRange); - TScanRange scanRange = new TScanRange(); - scanRange.setExtScanRange(externalScanRange); - - // Locations - TScanRangeLocations locations = new TScanRangeLocations(); - locations.setScanRange(scanRange); - - TScanRangeLocation location = new TScanRangeLocation(); - Backend selectedBackend = backendPolicy.getNextBe(); - location.setBackendId(selectedBackend.getId()); - location.setServer(new TNetworkAddress(selectedBackend.getIp(), selectedBackend.getBePort())); - locations.addToLocations(location); - - return locations; - } - - private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List columnsFromPath, - List columnsFromPathKeys) - throws DdlException, MetaNotFoundException { - TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setStartOffset(fileSplit.getStart()); - rangeDesc.setSize(fileSplit.getLength()); - // fileSize only be used when format is orc or parquet and TFileType is broker - // When TFileType is other type, it is not necessary - rangeDesc.setFileSize(fileSplit.getFileLength()); - rangeDesc.setColumnsFromPath(columnsFromPath); - rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); - - if (getLocationType() == TFileType.FILE_HDFS) { - rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() == TFileType.FILE_BROKER) { - // need full path - rangeDesc.setPath(fileSplit.getPath().toString()); - } - return rangeDesc; - } - - protected static Optional getTFileType(String location) { - if (location != null && !location.isEmpty()) { - if (FeConstants.isObjStorage(location)) { - return Optional.of(TFileType.FILE_S3); - } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) { - return Optional.of(TFileType.FILE_HDFS); - } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) { - return Optional.of(TFileType.FILE_LOCAL); - } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) { - return Optional.of(TFileType.FILE_BROKER); - } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) { - return Optional.of(TFileType.FILE_BROKER); - } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { - return Optional.of(TFileType.FILE_BROKER); - } - } - return Optional.empty(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java new file mode 100644 index 0000000000..a8ede0d843 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.FunctionGenTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.spi.Split; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class TVFScanNode extends FileQueryScanNode { + private static final Logger LOG = LogManager.getLogger(TVFScanNode.class); + + private final ExternalFileTableValuedFunction tableValuedFunction; + private final FunctionGenTable table; + + /** + * External file scan node for table value function + * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv + * eg: s3 tvf + * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + */ + public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, needCheckColumnPriv); + table = (FunctionGenTable) this.desc.getTable(); + tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf(); + } + + @Override + protected void doInitialize() throws UserException { + Preconditions.checkNotNull(desc); + computeColumnFilter(); + initBackendPolicy(); + initSchemaParams(); + } + + @Override + protected String getFsName(FileSplit split) { + return tableValuedFunction.getFsName(); + } + + @Override + public TFileAttributes getFileAttributes() throws UserException { + return tableValuedFunction.getFileAttributes(); + } + + @Override + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { + return tableValuedFunction.getTFileFormatType(); + } + + @Override + public TFileType getLocationType() throws DdlException, MetaNotFoundException { + return tableValuedFunction.getTFileType(); + } + + @Override + public Map getLocationProperties() throws MetaNotFoundException, DdlException { + return tableValuedFunction.getLocationProperties(); + } + + @Override + public List getPathPartitionKeys() { + return Lists.newArrayList(); + } + + @Override + public TableIf getTargetTable() { + return table; + } + + @Override + public List getSplits() throws UserException { + List splits = Lists.newArrayList(); + List fileStatuses = tableValuedFunction.getFileStatuses(); + for (TBrokerFileStatus fileStatus : fileStatuses) { + Path path = new Path(fileStatus.getPath()); + try { + splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(), + fileStatus.isSplitable, null)); + } catch (IOException e) { + LOG.warn("get file split failed for TVF: {}", path, e); + throw new UserException(e); + } + } + return splits; + } + + private void addFileSplits(Path path, long fileSize, long splitSize, List splits) { + long bytesRemaining; + for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D; + bytesRemaining -= splitSize) { + splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null)); + } + if (bytesRemaining != 0L) { + splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null)); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java deleted file mode 100644 index e4941ffa5a..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java +++ /dev/null @@ -1,85 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.FunctionGenTable; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; -import org.apache.doris.thrift.TFileAttributes; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; - -import com.google.common.collect.Lists; - -import java.util.List; -import java.util.Map; - -public class TVFScanProvider extends QueryScanProvider { - private FunctionGenTable tvfTable; - private final TupleDescriptor desc; - private ExternalFileTableValuedFunction tableValuedFunction; - - public TVFScanProvider(FunctionGenTable tvfTable, TupleDescriptor desc, - ExternalFileTableValuedFunction tableValuedFunction) { - this.tvfTable = tvfTable; - this.desc = desc; - this.tableValuedFunction = tableValuedFunction; - this.splitter = new TVFSplitter(tableValuedFunction); - } - - public String getFsName() { - return tableValuedFunction.getFsName(); - } - - // =========== implement abstract methods of QueryScanProvider ================= - @Override - public TFileAttributes getFileAttributes() throws UserException { - return tableValuedFunction.getFileAttributes(); - } - - - // =========== implement interface methods of FileScanProviderIf ================ - @Override - public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { - return tableValuedFunction.getTFileFormatType(); - } - - @Override - public TFileType getLocationType() throws DdlException, MetaNotFoundException { - return tableValuedFunction.getTFileType(); - } - - @Override - public Map getLocationProperties() throws MetaNotFoundException, DdlException { - return tableValuedFunction.getLocationProperties(); - } - - @Override - public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { - return Lists.newArrayList(); - } - - @Override - public TableIf getTargetTable() { - return tvfTable; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java deleted file mode 100644 index e2f5e556aa..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.Expr; -import org.apache.doris.common.UserException; -import org.apache.doris.planner.Split; -import org.apache.doris.planner.Splitter; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; -import org.apache.doris.thrift.TBrokerFileStatus; - -import com.google.common.collect.Lists; -import org.apache.hadoop.fs.Path; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; - -public class TVFSplitter implements Splitter { - - private static final Logger LOG = LogManager.getLogger(TVFSplitter.class); - - private ExternalFileTableValuedFunction tableValuedFunction; - - public TVFSplitter(ExternalFileTableValuedFunction tableValuedFunction) { - this.tableValuedFunction = tableValuedFunction; - } - - @Override - public List getSplits(List exprs) throws UserException { - List splits = Lists.newArrayList(); - List fileStatuses = tableValuedFunction.getFileStatuses(); - for (TBrokerFileStatus fileStatus : fileStatuses) { - long fileLength = fileStatus.getSize(); - Path path = new Path(fileStatus.getPath()); - if (fileStatus.isSplitable) { - long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); - if (splitSize <= 0) { - splitSize = fileStatus.getBlockSize(); - } - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE; - addFileSplits(path, fileLength, splitSize, splits); - } else { - Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0], null); - splits.add(split); - } - } - return splits; - } - - private void addFileSplits(Path path, long fileSize, long splitSize, List splits) { - long bytesRemaining; - for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D; - bytesRemaining -= splitSize) { - splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null)); - } - if (bytesRemaining != 0L) { - splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null)); - } - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java index 87760f77e1..0fc0f39dff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java @@ -26,8 +26,9 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.planner.ColumnRange; -import org.apache.doris.planner.external.HiveScanProvider; +import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.iceberg.TableProperties; @@ -36,15 +37,14 @@ import java.util.Map; public class IcebergHMSSource implements IcebergSource { private final HMSExternalTable hmsTable; - private final HiveScanProvider hiveScanProvider; - private final TupleDescriptor desc; + private final Map columnNameToRange; public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc, Map columnNameToRange) { - this.hiveScanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange); this.hmsTable = hmsTable; this.desc = desc; + this.columnNameToRange = columnNameToRange; } @Override @@ -54,8 +54,8 @@ public class IcebergHMSSource implements IcebergSource { @Override public String getFileFormat() throws DdlException, MetaNotFoundException { - return hiveScanProvider.getRemoteHiveTable().getParameters() - .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + return hmsTable.getRemoteTable().getParameters() + .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); } public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { @@ -64,12 +64,19 @@ public class IcebergHMSSource implements IcebergSource { @Override public TableIf getTargetTable() { - return hiveScanProvider.getTargetTable(); + return hmsTable; } @Override public TFileAttributes getFileAttributes() throws UserException { - return hiveScanProvider.getFileAttributes(); + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() + .getOrDefault(HiveScanNode.PROP_FIELD_DELIMITER, HiveScanNode.DEFAULT_FIELD_DELIMITER)); + textParams.setLineDelimiter(HiveScanNode.DEFAULT_LINE_DELIMITER); + TFileAttributes fileAttributes = new TFileAttributes(); + fileAttributes.setTextParams(textParams); + fileAttributes.setHeaderType(""); + return fileAttributes; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java new file mode 100644 index 0000000000..f19a7a43de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.iceberg; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TableSnapshot; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.catalog.external.IcebergExternalTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.external.FileQueryScanNode; +import org.apache.doris.planner.external.TableFormatType; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.spi.Split; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TIcebergDeleteFileDesc; +import org.apache.doris.thrift.TIcebergFileDesc; +import org.apache.doris.thrift.TTableFormatFileDesc; + +import avro.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.types.Conversions; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Collectors; + +public class IcebergScanNode extends FileQueryScanNode { + + public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; + + private IcebergSource source; + + /** + * External file scan node for Query iceberg table + * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv + * eg: s3 tvf + * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + */ + public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv); + } + + @Override + protected void doInitialize() throws UserException { + ExternalTable table = (ExternalTable) desc.getTable(); + if (table.isView()) { + throw new AnalysisException( + String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName())); + } + computeColumnFilter(); + initBackendPolicy(); + if (table instanceof HMSExternalTable) { + source = new IcebergHMSSource((HMSExternalTable) table, desc, columnNameToRange); + } else if (table instanceof IcebergExternalTable) { + String catalogType = ((IcebergExternalTable) table).getIcebergCatalogType(); + switch (catalogType) { + case IcebergExternalCatalog.ICEBERG_HMS: + case IcebergExternalCatalog.ICEBERG_REST: + case IcebergExternalCatalog.ICEBERG_DLF: + case IcebergExternalCatalog.ICEBERG_GLUE: + source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange); + break; + default: + throw new UserException("Unknown iceberg catalog type: " + catalogType); + } + } + Preconditions.checkNotNull(source); + initSchemaParams(); + } + + public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) { + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value()); + TIcebergFileDesc fileDesc = new TIcebergFileDesc(); + int formatVersion = icebergSplit.getFormatVersion(); + fileDesc.setFormatVersion(formatVersion); + if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { + fileDesc.setContent(FileContent.DATA.id()); + } else { + for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) { + TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); + deleteFileDesc.setPath(filter.getDeleteFilePath()); + if (filter instanceof IcebergDeleteFileFilter.PositionDelete) { + fileDesc.setContent(FileContent.POSITION_DELETES.id()); + IcebergDeleteFileFilter.PositionDelete positionDelete = + (IcebergDeleteFileFilter.PositionDelete) filter; + OptionalLong lowerBound = positionDelete.getPositionLowerBound(); + OptionalLong upperBound = positionDelete.getPositionUpperBound(); + if (lowerBound.isPresent()) { + deleteFileDesc.setPositionLowerBound(lowerBound.getAsLong()); + } + if (upperBound.isPresent()) { + deleteFileDesc.setPositionUpperBound(upperBound.getAsLong()); + } + } else { + fileDesc.setContent(FileContent.EQUALITY_DELETES.id()); + IcebergDeleteFileFilter.EqualityDelete equalityDelete = + (IcebergDeleteFileFilter.EqualityDelete) filter; + deleteFileDesc.setFieldIds(equalityDelete.getFieldIds()); + } + fileDesc.addToDeleteFiles(deleteFileDesc); + } + } + tableFormatFileDesc.setIcebergParams(fileDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + + @Override + public List getSplits() throws UserException { + List expressions = new ArrayList<>(); + org.apache.iceberg.Table table = source.getIcebergTable(); + for (Expr conjunct : conjuncts) { + Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema()); + if (expression != null) { + expressions.add(expression); + } + } + TableScan scan = table.newScan(); + TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot(); + if (tableSnapshot != null) { + TableSnapshot.VersionType type = tableSnapshot.getType(); + try { + if (type == TableSnapshot.VersionType.VERSION) { + scan = scan.useSnapshot(tableSnapshot.getVersion()); + } else { + long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); + scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId)); + } + } catch (IllegalArgumentException e) { + throw new UserException(e); + } + } + for (Expression predicate : expressions) { + scan = scan.filter(predicate); + } + List splits = new ArrayList<>(); + int formatVersion = ((BaseTable) table).operations().current().formatVersion(); + // Min split size is DEFAULT_SPLIT_SIZE(128MB). + long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE); + for (FileScanTask task : scan.planFiles()) { + long fileSize = task.file().fileSizeInBytes(); + for (FileScanTask splitTask : task.split(splitSize)) { + String dataFilePath = splitTask.file().path().toString(); + IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(), + splitTask.length(), fileSize, new String[0]); + split.setFormatVersion(formatVersion); + if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { + split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); + } + split.setTableFormatType(TableFormatType.ICEBERG); + splits.add(split); + } + } + return splits; + } + + private long getSnapshotIdAsOfTime(List historyEntries, long asOfTimestamp) { + // find history at or before asOfTimestamp + HistoryEntry latestHistory = null; + for (HistoryEntry entry : historyEntries) { + if (entry.timestampMillis() <= asOfTimestamp) { + if (latestHistory == null) { + latestHistory = entry; + continue; + } + if (entry.timestampMillis() > latestHistory.timestampMillis()) { + latestHistory = entry; + } + } + } + if (latestHistory == null) { + throw new NotFoundException("No version history at or before " + + Instant.ofEpochMilli(asOfTimestamp)); + } + return latestHistory.snapshotId(); + } + + private List getDeleteFileFilters(FileScanTask spitTask) { + List filters = new ArrayList<>(); + for (DeleteFile delete : spitTask.deletes()) { + if (delete.content() == FileContent.POSITION_DELETES) { + ByteBuffer lowerBoundBytes = delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId()); + Optional positionLowerBound = Optional.ofNullable(lowerBoundBytes) + .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); + ByteBuffer upperBoundBytes = delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId()); + Optional positionUpperBound = Optional.ofNullable(upperBoundBytes) + .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); + filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(), + positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L))); + } else if (delete.content() == FileContent.EQUALITY_DELETES) { + // todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(), + throw new IllegalStateException("Don't support equality delete file"); + } else { + throw new IllegalStateException("Unknown delete content: " + delete.content()); + } + } + return filters; + } + + @Override + public TFileType getLocationType() throws UserException { + Table icebergTable = source.getIcebergTable(); + String location = icebergTable.location(); + return getTFileType(location).orElseThrow(() -> + new DdlException("Unknown file location " + location + " for iceberg table " + icebergTable.name())); + } + + @Override + public TFileFormatType getFileFormatType() throws UserException { + TFileFormatType type; + String icebergFormat = source.getFileFormat(); + if (icebergFormat.equalsIgnoreCase("parquet")) { + type = TFileFormatType.FORMAT_PARQUET; + } else if (icebergFormat.equalsIgnoreCase("orc")) { + type = TFileFormatType.FORMAT_ORC; + } else { + throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat)); + } + return type; + } + + @Override + public TFileAttributes getFileAttributes() throws UserException { + return source.getFileAttributes(); + } + + @Override + public List getPathPartitionKeys() throws UserException { + return source.getIcebergTable().spec().fields().stream().map(PartitionField::name) + .collect(Collectors.toList()); + } + + @Override + public TableIf getTargetTable() { + return source.getTargetTable(); + } + + @Override + public Map getLocationProperties() throws UserException { + return source.getCatalog().getProperties(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java deleted file mode 100644 index d0bd72f5c7..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external.iceberg; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.planner.external.IcebergSplitter; -import org.apache.doris.planner.external.QueryScanProvider; -import org.apache.doris.thrift.TFileAttributes; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.TIcebergDeleteFileDesc; -import org.apache.doris.thrift.TIcebergFileDesc; -import org.apache.doris.thrift.TTableFormatFileDesc; - -import org.apache.iceberg.FileContent; -import org.apache.iceberg.PartitionField; - -import java.util.List; -import java.util.Map; -import java.util.OptionalLong; -import java.util.stream.Collectors; - -/** - * A file scan provider for iceberg. - */ -public class IcebergScanProvider extends QueryScanProvider { - - public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; - private final Analyzer analyzer; - private final IcebergSource icebergSource; - - public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer) { - this.icebergSource = icebergSource; - this.analyzer = analyzer; - this.splitter = new IcebergSplitter(icebergSource, analyzer); - } - - public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) { - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value()); - TIcebergFileDesc fileDesc = new TIcebergFileDesc(); - int formatVersion = icebergSplit.getFormatVersion(); - fileDesc.setFormatVersion(formatVersion); - if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { - fileDesc.setContent(FileContent.DATA.id()); - } else { - for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) { - TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); - deleteFileDesc.setPath(filter.getDeleteFilePath()); - if (filter instanceof IcebergDeleteFileFilter.PositionDelete) { - fileDesc.setContent(FileContent.POSITION_DELETES.id()); - IcebergDeleteFileFilter.PositionDelete positionDelete = - (IcebergDeleteFileFilter.PositionDelete) filter; - OptionalLong lowerBound = positionDelete.getPositionLowerBound(); - OptionalLong upperBound = positionDelete.getPositionUpperBound(); - if (lowerBound.isPresent()) { - deleteFileDesc.setPositionLowerBound(lowerBound.getAsLong()); - } - if (upperBound.isPresent()) { - deleteFileDesc.setPositionUpperBound(upperBound.getAsLong()); - } - } else { - fileDesc.setContent(FileContent.EQUALITY_DELETES.id()); - IcebergDeleteFileFilter.EqualityDelete equalityDelete = - (IcebergDeleteFileFilter.EqualityDelete) filter; - deleteFileDesc.setFieldIds(equalityDelete.getFieldIds()); - } - fileDesc.addToDeleteFiles(deleteFileDesc); - } - } - tableFormatFileDesc.setIcebergParams(fileDesc); - rangeDesc.setTableFormatParams(tableFormatFileDesc); - } - - @Override - public TFileType getLocationType() throws DdlException, MetaNotFoundException { - org.apache.iceberg.Table table = icebergSource.getIcebergTable(); - String location = table.location(); - return getTFileType(location).orElseThrow(() -> - new DdlException("Unknown file location " + location + " for iceberg table " + table.name())); - } - - @Override - public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { - return icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name) - .collect(Collectors.toList()); - } - - @Override - public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { - TFileFormatType type; - String icebergFormat = icebergSource.getFileFormat(); - if (icebergFormat.equalsIgnoreCase("parquet")) { - type = TFileFormatType.FORMAT_PARQUET; - } else if (icebergFormat.equalsIgnoreCase("orc")) { - type = TFileFormatType.FORMAT_ORC; - } else { - throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat)); - } - return type; - } - - @Override - public Map getLocationProperties() throws MetaNotFoundException, DdlException { - return icebergSource.getCatalog().getProperties(); - } - - @Override - public TableIf getTargetTable() { - return icebergSource.getTargetTable(); - } - - @Override - public TFileAttributes getFileAttributes() throws UserException { - return icebergSource.getFileAttributes(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index e840c9a876..896b4968b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -17,7 +17,6 @@ package org.apache.doris.planner.external.iceberg; -import org.apache.doris.analysis.Analyzer; import org.apache.doris.planner.external.FileSplit; import lombok.Data; @@ -31,8 +30,6 @@ public class IcebergSplit extends FileSplit { super(file, start, length, fileLength, hosts, null); } - private Analyzer analyzer; - private String dataFilePath; private Integer formatVersion; private List deleteFileFilters; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java similarity index 79% rename from fe/fe-core/src/main/java/org/apache/doris/planner/Split.java rename to fe/fe-core/src/main/java/org/apache/doris/spi/Split.java index 63b837aacc..31b1e1515a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java +++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java @@ -15,17 +15,15 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner; +package org.apache.doris.spi; -import lombok.Data; +/** + * Split interface. e.g. Tablet for Olap Table. + */ +public interface Split { -@Data -public abstract class Split { - protected String[] hosts; + String[] getHosts(); - public Split() {} + Object getInfo(); - public Split(String[] hosts) { - this.hosts = hosts; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index 586c9139cc..c39be3cf2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -31,6 +31,8 @@ public enum StatisticalType { HASH_JOIN_NODE, HIVE_SCAN_NODE, ICEBERG_SCAN_NODE, + HUDI_SCAN_NODE, + TVF_SCAN_NODE, INTERSECT_NODE, LOAD_SCAN_NODE, MYSQL_SCAN_NODE, diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index bde54ab4b0..48e8307799 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -37,7 +37,7 @@ import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; -import org.apache.doris.planner.external.FileQueryScanNode; +import org.apache.doris.planner.external.TVFScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest; import org.apache.doris.proto.Types.PScalarType; @@ -317,7 +317,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio @Override public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { - return new FileQueryScanNode(id, desc, false); + return new TVFScanNode(id, desc, false); } @Override