[Refactor](ScanNode) Split interface refactor (#19133)
Move getSplits function to ScanNode, remove Splitter interface. For each kind of data source, create a specific ScanNode and implement the getSplits interface. For example, HiveScanNode. Remove FileScanProviderIf move the code to each ScanNode.
This commit is contained in:
@ -473,4 +473,14 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
|
||||
}
|
||||
return specifiedDatabaseMap;
|
||||
}
|
||||
|
||||
public boolean useSelfSplitter() {
|
||||
Map<String, String> properties = catalogProperty.getProperties();
|
||||
boolean ret = true;
|
||||
if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
|
||||
&& properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false")) {
|
||||
ret = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,10 +26,14 @@ import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.CacheException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.external.hive.util.HiveUtil;
|
||||
import org.apache.doris.fs.FileSystemFactory;
|
||||
import org.apache.doris.fs.RemoteFiles;
|
||||
import org.apache.doris.fs.remote.RemoteFile;
|
||||
import org.apache.doris.fs.remote.RemoteFileSystem;
|
||||
import org.apache.doris.metric.GaugeMetric;
|
||||
import org.apache.doris.metric.Metric;
|
||||
import org.apache.doris.metric.MetricLabel;
|
||||
@ -37,9 +41,8 @@ import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.planner.ColumnBound;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
|
||||
import org.apache.doris.planner.Split;
|
||||
import org.apache.doris.planner.external.FileSplit;
|
||||
import org.apache.doris.planner.external.HiveSplitter;
|
||||
import org.apache.doris.spi.Split;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
@ -263,6 +266,19 @@ public class HiveMetaStoreCache {
|
||||
return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values);
|
||||
}
|
||||
|
||||
// Get File Status by using FileSystem API.
|
||||
private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat,
|
||||
JobConf jobConf,
|
||||
List<String> partitionValues) throws UserException {
|
||||
FileCacheValue result = new FileCacheValue();
|
||||
result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf));
|
||||
RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
|
||||
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
|
||||
locatedFiles.locations().forEach(result::addFile);
|
||||
result.setPartitionValues(partitionValues);
|
||||
return result;
|
||||
}
|
||||
|
||||
private FileCacheValue loadFiles(FileCacheKey key) {
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
@ -284,8 +300,7 @@ public class HiveMetaStoreCache {
|
||||
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
|
||||
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
|
||||
if (key.useSelfSplitter) {
|
||||
result = HiveSplitter.getFileCache(finalLocation, inputFormat,
|
||||
jobConf, key.getPartitionValues());
|
||||
result = getFileCache(finalLocation, inputFormat, jobConf, key.getPartitionValues());
|
||||
} else {
|
||||
InputSplit[] splits;
|
||||
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
|
||||
|
||||
@ -44,6 +44,8 @@ import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.catalog.external.IcebergExternalTable;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.Util;
|
||||
@ -139,7 +141,9 @@ import org.apache.doris.planner.SetOperationNode;
|
||||
import org.apache.doris.planner.SortNode;
|
||||
import org.apache.doris.planner.TableFunctionNode;
|
||||
import org.apache.doris.planner.UnionNode;
|
||||
import org.apache.doris.planner.external.FileQueryScanNode;
|
||||
import org.apache.doris.planner.external.HiveScanNode;
|
||||
import org.apache.doris.planner.external.HudiScanNode;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
|
||||
import org.apache.doris.tablefunction.TableValuedFunctionIf;
|
||||
import org.apache.doris.thrift.TPartitionType;
|
||||
import org.apache.doris.thrift.TPushAggOp;
|
||||
@ -600,24 +604,44 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
ExternalTable table = fileScan.getTable();
|
||||
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, table, context);
|
||||
tupleDescriptor.setTable(table);
|
||||
|
||||
// TODO(cmy): determine the needCheckColumnPriv param
|
||||
FileQueryScanNode fileScanNode = new FileQueryScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
|
||||
ScanNode scanNode = null;
|
||||
if (table instanceof HMSExternalTable) {
|
||||
switch (((HMSExternalTable) table).getDlaType()) {
|
||||
case HUDI:
|
||||
scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
|
||||
break;
|
||||
case ICEBERG:
|
||||
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
|
||||
break;
|
||||
case HIVE:
|
||||
scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} else if (table instanceof IcebergExternalTable) {
|
||||
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
|
||||
}
|
||||
Preconditions.checkNotNull(scanNode);
|
||||
TableName tableName = new TableName(null, "", "");
|
||||
TableRef ref = new TableRef(tableName, null, null);
|
||||
BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
|
||||
tupleDescriptor.setRef(tableRef);
|
||||
|
||||
Utils.execWithUncheckedException(fileScanNode::init);
|
||||
context.addScanNode(fileScanNode);
|
||||
Utils.execWithUncheckedException(scanNode::init);
|
||||
context.addScanNode(scanNode);
|
||||
ScanNode finalScanNode = scanNode;
|
||||
context.getRuntimeTranslator().ifPresent(
|
||||
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(fileScan.getId()).forEach(
|
||||
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, fileScanNode, context)
|
||||
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context)
|
||||
)
|
||||
);
|
||||
Utils.execWithUncheckedException(fileScanNode::finalizeForNereids);
|
||||
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
|
||||
// Create PlanFragment
|
||||
DataPartition dataPartition = DataPartition.RANDOM;
|
||||
PlanFragment planFragment = createPlanFragment(fileScanNode, dataPartition, fileScan);
|
||||
PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
|
||||
context.addPlanFragment(planFragment);
|
||||
updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
|
||||
return planFragment;
|
||||
|
||||
@ -42,7 +42,6 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.planner.external.FileGroupInfo;
|
||||
import org.apache.doris.planner.external.FileScanNode;
|
||||
import org.apache.doris.planner.external.FileScanProviderIf;
|
||||
import org.apache.doris.planner.external.LoadScanProvider;
|
||||
import org.apache.doris.rewrite.ExprRewriter;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
@ -82,7 +81,7 @@ public class FileLoadScanNode extends FileScanNode {
|
||||
// Each DataDescription in a load stmt conreponding to a FileGroupInfo in this list.
|
||||
private final List<FileGroupInfo> fileGroupInfos = Lists.newArrayList();
|
||||
// For load, the num of providers equals to the num of file group infos.
|
||||
private final List<FileScanProviderIf> scanProviders = Lists.newArrayList();
|
||||
private final List<LoadScanProvider> scanProviders = Lists.newArrayList();
|
||||
// For load, the num of ParamCreateContext equals to the num of file group infos.
|
||||
private final List<ParamCreateContext> contexts = Lists.newArrayList();
|
||||
|
||||
@ -128,13 +127,13 @@ public class FileLoadScanNode extends FileScanNode {
|
||||
|
||||
// For each scan provider, create a corresponding ParamCreateContext
|
||||
private void initParamCreateContexts(Analyzer analyzer) throws UserException {
|
||||
for (FileScanProviderIf scanProvider : scanProviders) {
|
||||
for (LoadScanProvider scanProvider : scanProviders) {
|
||||
ParamCreateContext context = scanProvider.createContext(analyzer);
|
||||
// set where and preceding filter.
|
||||
// FIXME(cmy): we should support set different expr for different file group.
|
||||
initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer);
|
||||
initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer);
|
||||
setDefaultValueExprs(scanProvider, context.srcSlotDescByName, context.params, true);
|
||||
setDefaultValueExprs(scanProvider.getTargetTable(), context.srcSlotDescByName, context.params, true);
|
||||
this.contexts.add(context);
|
||||
}
|
||||
}
|
||||
@ -196,7 +195,7 @@ public class FileLoadScanNode extends FileScanNode {
|
||||
contexts.size() + " vs. " + scanProviders.size());
|
||||
for (int i = 0; i < contexts.size(); ++i) {
|
||||
FileLoadScanNode.ParamCreateContext context = contexts.get(i);
|
||||
FileScanProviderIf scanProvider = scanProviders.get(i);
|
||||
LoadScanProvider scanProvider = scanProviders.get(i);
|
||||
finalizeParamsForLoad(context, analyzer);
|
||||
createScanRangeLocations(context, scanProvider);
|
||||
this.inputSplitsNum += scanProvider.getInputSplitNum();
|
||||
|
||||
@ -1,31 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class OlapSplitter implements Splitter {
|
||||
|
||||
@Override
|
||||
public List<Split> getSplits(List<Expr> exprs) throws UserException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -37,8 +37,10 @@ import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.common.NotImplementedException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
@ -104,6 +106,10 @@ public abstract class ScanNode extends PlanNode {
|
||||
sortColumn = column;
|
||||
}
|
||||
|
||||
protected List<Split> getSplits() throws UserException {
|
||||
throw new NotImplementedException("Scan node sub class need to implement getSplits interface.");
|
||||
}
|
||||
|
||||
/**
|
||||
* cast expr to SlotDescriptor type
|
||||
*/
|
||||
|
||||
@ -60,6 +60,8 @@ import org.apache.doris.catalog.MysqlTable;
|
||||
import org.apache.doris.catalog.OdbcTable;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
@ -67,6 +69,9 @@ import org.apache.doris.common.Reference;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.planner.external.FileQueryScanNode;
|
||||
import org.apache.doris.planner.external.HiveScanNode;
|
||||
import org.apache.doris.planner.external.HudiScanNode;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
|
||||
import org.apache.doris.thrift.TNullSide;
|
||||
@ -1986,8 +1991,7 @@ public class SingleNodePlanner {
|
||||
case HIVE:
|
||||
throw new RuntimeException("Hive external table is not supported, try to use hive catalog please");
|
||||
case ICEBERG:
|
||||
scanNode = new FileQueryScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
break;
|
||||
throw new RuntimeException("Iceberg external table is not supported, use iceberg catalog please");
|
||||
case HUDI:
|
||||
throw new UserException(
|
||||
"Hudi table is no longer supported. Use Multi Catalog feature to connect to Hudi");
|
||||
@ -1998,8 +2002,23 @@ public class SingleNodePlanner {
|
||||
scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId());
|
||||
break;
|
||||
case HMS_EXTERNAL_TABLE:
|
||||
TableIf table = tblRef.getDesc().getTable();
|
||||
switch (((HMSExternalTable) table).getDlaType()) {
|
||||
case HUDI:
|
||||
scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
break;
|
||||
case ICEBERG:
|
||||
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
break;
|
||||
case HIVE:
|
||||
scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
break;
|
||||
default:
|
||||
throw new UserException("Not supported table type" + table.getType());
|
||||
}
|
||||
break;
|
||||
case ICEBERG_EXTERNAL_TABLE:
|
||||
scanNode = new FileQueryScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
break;
|
||||
case ES_EXTERNAL_TABLE:
|
||||
scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode", true);
|
||||
|
||||
@ -1,29 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface Splitter {
|
||||
static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB
|
||||
|
||||
List<Split> getSplits(List<Expr> exprs) throws UserException;
|
||||
}
|
||||
@ -22,24 +22,38 @@ import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.FunctionGenTable;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.catalog.external.IcebergExternalTable;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.NotImplementedException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
|
||||
import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergApiSource;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergHMSSource;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergSource;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergSplit;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TExternalScanRange;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TFileScanRange;
|
||||
import org.apache.doris.thrift.TFileScanRangeParams;
|
||||
import org.apache.doris.thrift.TFileScanSlotInfo;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.THdfsParams;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TScanRange;
|
||||
import org.apache.doris.thrift.TScanRangeLocation;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -48,20 +62,21 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* FileQueryScanNode for querying the file access type of catalog, now only support
|
||||
* hive,hudi and iceberg.
|
||||
* hive,hudi, iceberg and TVF.
|
||||
*/
|
||||
public class FileQueryScanNode extends FileScanNode {
|
||||
public abstract class FileQueryScanNode extends FileScanNode {
|
||||
private static final Logger LOG = LogManager.getLogger(FileQueryScanNode.class);
|
||||
|
||||
// For query, there is only one FileScanProvider.
|
||||
private FileScanProviderIf scanProvider;
|
||||
protected Map<String, SlotDescriptor> destSlotDescByName;
|
||||
protected TFileScanRangeParams params;
|
||||
|
||||
private Map<String, SlotDescriptor> destSlotDescByName;
|
||||
private TFileScanRangeParams params;
|
||||
protected int inputSplitNum = 0;
|
||||
protected long inputFileSize = 0;
|
||||
|
||||
/**
|
||||
* External file scan node for Query hms table
|
||||
@ -69,8 +84,9 @@ public class FileQueryScanNode extends FileScanNode {
|
||||
* eg: s3 tvf
|
||||
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
|
||||
*/
|
||||
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
|
||||
super(id, desc, "FILE_QUERY_SCAN_NODE", StatisticalType.FILE_SCAN_NODE, needCheckColumnPriv);
|
||||
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
|
||||
StatisticalType statisticalType, boolean needCheckColumnPriv) {
|
||||
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -87,36 +103,27 @@ public class FileQueryScanNode extends FileScanNode {
|
||||
}
|
||||
|
||||
// Init scan provider and schema related params.
|
||||
private void doInitialize() throws UserException {
|
||||
protected void doInitialize() throws UserException {
|
||||
Preconditions.checkNotNull(desc);
|
||||
ExternalTable table = (ExternalTable) desc.getTable();
|
||||
if (table.isView()) {
|
||||
throw new AnalysisException(
|
||||
String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName()));
|
||||
}
|
||||
computeColumnFilter();
|
||||
initScanProvider();
|
||||
initBackendPolicy();
|
||||
initSchemaParams();
|
||||
}
|
||||
|
||||
private void initScanProvider() throws UserException {
|
||||
if (this.desc.getTable() instanceof HMSExternalTable) {
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable();
|
||||
initHMSTableScanProvider(hmsTable);
|
||||
} else if (this.desc.getTable() instanceof FunctionGenTable) {
|
||||
FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
|
||||
initTVFScanProvider(table, (ExternalFileTableValuedFunction) table.getTvf());
|
||||
} else if (this.desc.getTable() instanceof IcebergExternalTable) {
|
||||
IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable();
|
||||
initIcebergScanProvider(table);
|
||||
}
|
||||
}
|
||||
|
||||
// Init schema (Tuple/Slot) related params.
|
||||
private void initSchemaParams() throws UserException {
|
||||
protected void initSchemaParams() throws UserException {
|
||||
destSlotDescByName = Maps.newHashMap();
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
destSlotDescByName.put(slot.getColumn().getName(), slot);
|
||||
}
|
||||
params = new TFileScanRangeParams();
|
||||
params.setDestTupleId(desc.getId().asInt());
|
||||
List<String> partitionKeys = scanProvider.getPathPartitionKeys();
|
||||
List<String> partitionKeys = getPathPartitionKeys();
|
||||
List<Column> columns = desc.getTable().getBaseSchema(false);
|
||||
params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
@ -128,20 +135,13 @@ public class FileQueryScanNode extends FileScanNode {
|
||||
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
|
||||
params.addToRequiredSlots(slotInfo);
|
||||
}
|
||||
setDefaultValueExprs(scanProvider, destSlotDescByName, params, false);
|
||||
setDefaultValueExprs(getTargetTable(), destSlotDescByName, params, false);
|
||||
setColumnPositionMappingForTextFile();
|
||||
// For query, set src tuple id to -1.
|
||||
params.setSrcTupleId(-1);
|
||||
TableIf table = desc.getTable();
|
||||
// Slot to schema id map is used for supporting hive 1.x orc internal column name (col0, col1, col2...)
|
||||
if (table instanceof HMSExternalTable) {
|
||||
if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
|
||||
genSlotToSchemaIdMap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void initBackendPolicy() throws UserException {
|
||||
protected void initBackendPolicy() throws UserException {
|
||||
backendPolicy.init();
|
||||
numNodes = backendPolicy.numBackends();
|
||||
}
|
||||
@ -161,74 +161,11 @@ public class FileQueryScanNode extends FileScanNode {
|
||||
|
||||
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
|
||||
slotInfo.setSlotId(slot.getId().asInt());
|
||||
slotInfo.setIsFileSlot(!scanProvider.getPathPartitionKeys().contains(slot.getColumn().getName()));
|
||||
slotInfo.setIsFileSlot(!getPathPartitionKeys().contains(slot.getColumn().getName()));
|
||||
params.addToRequiredSlots(slotInfo);
|
||||
}
|
||||
}
|
||||
|
||||
private void initHMSTableScanProvider(HMSExternalTable hmsTable) throws UserException {
|
||||
Preconditions.checkNotNull(hmsTable);
|
||||
|
||||
if (hmsTable.isView()) {
|
||||
throw new AnalysisException(
|
||||
String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(),
|
||||
hmsTable.getDbName(), hmsTable.getName()));
|
||||
}
|
||||
|
||||
switch (hmsTable.getDlaType()) {
|
||||
case HUDI:
|
||||
scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange);
|
||||
break;
|
||||
case ICEBERG:
|
||||
IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc, columnNameToRange);
|
||||
scanProvider = new IcebergScanProvider(hmsSource, analyzer);
|
||||
break;
|
||||
case HIVE:
|
||||
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
|
||||
if (inputFormat.contains("TextInputFormat")) {
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
if (!slot.getType().isScalarType()) {
|
||||
throw new UserException("For column `" + slot.getColumn().getName()
|
||||
+ "`, The column types ARRAY/MAP/STRUCT are not supported yet"
|
||||
+ " for text input format of Hive. ");
|
||||
}
|
||||
}
|
||||
}
|
||||
scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange);
|
||||
break;
|
||||
default:
|
||||
throw new UserException("Unknown table type: " + hmsTable.getDlaType());
|
||||
}
|
||||
}
|
||||
|
||||
private void initIcebergScanProvider(IcebergExternalTable icebergTable) throws UserException {
|
||||
Preconditions.checkNotNull(icebergTable);
|
||||
if (icebergTable.isView()) {
|
||||
throw new AnalysisException(
|
||||
String.format("Querying external view '%s.%s' is not supported", icebergTable.getDbName(),
|
||||
icebergTable.getName()));
|
||||
}
|
||||
|
||||
String catalogType = icebergTable.getIcebergCatalogType();
|
||||
switch (catalogType) {
|
||||
case IcebergExternalCatalog.ICEBERG_HMS:
|
||||
case IcebergExternalCatalog.ICEBERG_REST:
|
||||
case IcebergExternalCatalog.ICEBERG_DLF:
|
||||
case IcebergExternalCatalog.ICEBERG_GLUE:
|
||||
IcebergSource icebergSource = new IcebergApiSource(
|
||||
icebergTable, desc, columnNameToRange);
|
||||
scanProvider = new IcebergScanProvider(icebergSource, analyzer);
|
||||
break;
|
||||
default:
|
||||
throw new UserException("Unknown iceberg catalog type: " + catalogType);
|
||||
}
|
||||
}
|
||||
|
||||
private void initTVFScanProvider(FunctionGenTable table, ExternalFileTableValuedFunction tvf) {
|
||||
Preconditions.checkNotNull(table);
|
||||
scanProvider = new TVFScanProvider(table, desc, tvf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalize(Analyzer analyzer) throws UserException {
|
||||
doFinalize();
|
||||
@ -240,19 +177,13 @@ public class FileQueryScanNode extends FileScanNode {
|
||||
}
|
||||
|
||||
// Create scan range locations and the statistics.
|
||||
private void doFinalize() throws UserException {
|
||||
createScanRangeLocations(conjuncts, params, scanProvider);
|
||||
this.inputSplitsNum += scanProvider.getInputSplitNum();
|
||||
this.totalFileSize += scanProvider.getInputFileSize();
|
||||
if (scanProvider instanceof HiveScanProvider) {
|
||||
this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum();
|
||||
this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum();
|
||||
}
|
||||
protected void doFinalize() throws UserException {
|
||||
createScanRangeLocations();
|
||||
}
|
||||
|
||||
private void setColumnPositionMappingForTextFile()
|
||||
throws UserException {
|
||||
TableIf tbl = scanProvider.getTargetTable();
|
||||
TableIf tbl = getTargetTable();
|
||||
List<Integer> columnIdxs = Lists.newArrayList();
|
||||
|
||||
for (TFileScanSlotInfo slot : params.getRequiredSlots()) {
|
||||
@ -270,20 +201,161 @@ public class FileQueryScanNode extends FileScanNode {
|
||||
params.setColumnIdxs(columnIdxs);
|
||||
}
|
||||
|
||||
// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
|
||||
private void genSlotToSchemaIdMap() {
|
||||
List<Column> baseSchema = desc.getTable().getBaseSchema();
|
||||
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
int idx = 0;
|
||||
for (Column col : baseSchema) {
|
||||
if (col.getName().equals(slot.getColumn().getName())) {
|
||||
columnNameToPosition.put(col.getName(), idx);
|
||||
break;
|
||||
public void createScanRangeLocations() throws UserException {
|
||||
long start = System.currentTimeMillis();
|
||||
List<Split> inputSplits = getSplits();
|
||||
this.inputSplitNum = inputSplits.size();
|
||||
if (inputSplits.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
FileSplit inputSplit = (FileSplit) inputSplits.get(0);
|
||||
TFileType locationType = getLocationType();
|
||||
params.setFileType(locationType);
|
||||
TFileFormatType fileFormatType = getFileFormatType();
|
||||
params.setFormatType(getFileFormatType());
|
||||
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) {
|
||||
params.setFileAttributes(getFileAttributes());
|
||||
}
|
||||
|
||||
// set hdfs params for hdfs file type.
|
||||
Map<String, String> locationProperties = getLocationProperties();
|
||||
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
|
||||
String fsName = getFsName(inputSplit);
|
||||
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
|
||||
tHdfsParams.setFsName(fsName);
|
||||
params.setHdfsParams(tHdfsParams);
|
||||
|
||||
if (locationType == TFileType.FILE_BROKER) {
|
||||
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
|
||||
if (broker == null) {
|
||||
throw new UserException("No alive broker.");
|
||||
}
|
||||
idx += 1;
|
||||
params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
|
||||
}
|
||||
} else if (locationType == TFileType.FILE_S3) {
|
||||
params.setProperties(locationProperties);
|
||||
}
|
||||
|
||||
List<String> pathPartitionKeys = getPathPartitionKeys();
|
||||
for (Split split : inputSplits) {
|
||||
TScanRangeLocations curLocations = newLocations(params, backendPolicy);
|
||||
FileSplit fileSplit = (FileSplit) split;
|
||||
|
||||
// If fileSplit has partition values, use the values collected from hive partitions.
|
||||
// Otherwise, use the values in file path.
|
||||
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
|
||||
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false)
|
||||
: fileSplit.getPartitionValues();
|
||||
|
||||
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
|
||||
// external data lake table
|
||||
if (fileSplit instanceof IcebergSplit) {
|
||||
IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit);
|
||||
}
|
||||
|
||||
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
|
||||
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
|
||||
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
|
||||
fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts()));
|
||||
scanRangeLocations.add(curLocations);
|
||||
this.inputFileSize += fileSplit.getLength();
|
||||
}
|
||||
LOG.debug("create #{} ScanRangeLocations cost: {} ms",
|
||||
scanRangeLocations.size(), (System.currentTimeMillis() - start));
|
||||
}
|
||||
|
||||
private TScanRangeLocations newLocations(TFileScanRangeParams params, FederationBackendPolicy backendPolicy) {
|
||||
// Generate on file scan range
|
||||
TFileScanRange fileScanRange = new TFileScanRange();
|
||||
fileScanRange.setParams(params);
|
||||
|
||||
// Scan range
|
||||
TExternalScanRange externalScanRange = new TExternalScanRange();
|
||||
externalScanRange.setFileScanRange(fileScanRange);
|
||||
TScanRange scanRange = new TScanRange();
|
||||
scanRange.setExtScanRange(externalScanRange);
|
||||
|
||||
// Locations
|
||||
TScanRangeLocations locations = new TScanRangeLocations();
|
||||
locations.setScanRange(scanRange);
|
||||
|
||||
TScanRangeLocation location = new TScanRangeLocation();
|
||||
Backend selectedBackend = backendPolicy.getNextBe();
|
||||
location.setBackendId(selectedBackend.getId());
|
||||
location.setServer(new TNetworkAddress(selectedBackend.getIp(), selectedBackend.getBePort()));
|
||||
locations.addToLocations(location);
|
||||
|
||||
return locations;
|
||||
}
|
||||
|
||||
private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
|
||||
List<String> columnsFromPathKeys)
|
||||
throws UserException {
|
||||
TFileRangeDesc rangeDesc = new TFileRangeDesc();
|
||||
rangeDesc.setStartOffset(fileSplit.getStart());
|
||||
rangeDesc.setSize(fileSplit.getLength());
|
||||
// fileSize only be used when format is orc or parquet and TFileType is broker
|
||||
// When TFileType is other type, it is not necessary
|
||||
rangeDesc.setFileSize(fileSplit.getFileLength());
|
||||
rangeDesc.setColumnsFromPath(columnsFromPath);
|
||||
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
|
||||
|
||||
if (getLocationType() == TFileType.FILE_HDFS) {
|
||||
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
|
||||
} else if (getLocationType() == TFileType.FILE_S3 || getLocationType() == TFileType.FILE_BROKER) {
|
||||
// need full path
|
||||
rangeDesc.setPath(fileSplit.getPath().toString());
|
||||
}
|
||||
return rangeDesc;
|
||||
}
|
||||
|
||||
protected TFileType getLocationType() throws UserException {
|
||||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
protected TFileFormatType getFileFormatType() throws UserException {
|
||||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
protected TFileAttributes getFileAttributes() throws UserException {
|
||||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
protected List<String> getPathPartitionKeys() throws UserException {
|
||||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
protected TableIf getTargetTable() throws UserException {
|
||||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
protected Map<String, String> getLocationProperties() throws UserException {
|
||||
throw new NotImplementedException("");
|
||||
}
|
||||
|
||||
// eg: hdfs://namenode s3://buckets
|
||||
protected String getFsName(FileSplit split) {
|
||||
String fullPath = split.getPath().toUri().toString();
|
||||
String filePath = split.getPath().toUri().getPath();
|
||||
return fullPath.replace(filePath, "");
|
||||
}
|
||||
|
||||
protected static Optional<TFileType> getTFileType(String location) {
|
||||
if (location != null && !location.isEmpty()) {
|
||||
if (FeConstants.isObjStorage(location)) {
|
||||
return Optional.of(TFileType.FILE_S3);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
|
||||
return Optional.of(TFileType.FILE_HDFS);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
|
||||
return Optional.of(TFileType.FILE_LOCAL);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
}
|
||||
}
|
||||
params.setSlotNameToSchemaPos(columnNameToPosition);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,6 +27,8 @@ import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.FileLoadScanNode;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TExpr;
|
||||
@ -41,9 +43,12 @@ import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimap;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
@ -55,6 +60,8 @@ import java.util.Map;
|
||||
public class FileScanNode extends ExternalScanNode {
|
||||
private static final Logger LOG = LogManager.getLogger(FileScanNode.class);
|
||||
|
||||
public static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB
|
||||
|
||||
// For explain
|
||||
protected long inputSplitsNum = 0;
|
||||
protected long totalFileSize = 0;
|
||||
@ -152,24 +159,17 @@ public class FileScanNode extends ExternalScanNode {
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
// TODO: Keep 2 versions of createScanRangeLocations, will fix this while refactor split and assignment code.
|
||||
// TODO: This api is for load job only. Will remove it later.
|
||||
protected void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
|
||||
FileScanProviderIf scanProvider)
|
||||
LoadScanProvider scanProvider)
|
||||
throws UserException {
|
||||
scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations);
|
||||
}
|
||||
|
||||
protected void createScanRangeLocations(List<Expr> conjuncts, TFileScanRangeParams params,
|
||||
FileScanProviderIf scanProvider)
|
||||
throws UserException {
|
||||
scanProvider.createScanRangeLocations(conjuncts, params, backendPolicy, scanRangeLocations);
|
||||
}
|
||||
|
||||
protected void setDefaultValueExprs(FileScanProviderIf scanProvider,
|
||||
protected void setDefaultValueExprs(TableIf tbl,
|
||||
Map<String, SlotDescriptor> slotDescByName,
|
||||
TFileScanRangeParams params,
|
||||
boolean useVarcharAsNull) throws UserException {
|
||||
TableIf tbl = scanProvider.getTargetTable();
|
||||
Preconditions.checkNotNull(tbl);
|
||||
TExpr tExpr = new TExpr();
|
||||
tExpr.setNodes(Lists.newArrayList());
|
||||
@ -210,4 +210,54 @@ public class FileScanNode extends ExternalScanNode {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
|
||||
boolean splittable, List<String> partitionValues) throws IOException {
|
||||
if (blockLocations == null) {
|
||||
blockLocations = new BlockLocation[0];
|
||||
}
|
||||
long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
|
||||
if (splitSize <= 0) {
|
||||
splitSize = blockSize;
|
||||
}
|
||||
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
|
||||
splitSize = Math.max(splitSize, DEFAULT_SPLIT_SIZE);
|
||||
List<Split> result = Lists.newArrayList();
|
||||
if (!splittable) {
|
||||
LOG.debug("Path {} is not splittable.", path);
|
||||
String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
|
||||
result.add(new FileSplit(path, 0, length, length, hosts, partitionValues));
|
||||
return result;
|
||||
}
|
||||
long bytesRemaining;
|
||||
for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D;
|
||||
bytesRemaining -= splitSize) {
|
||||
int location = getBlockIndex(blockLocations, length - bytesRemaining);
|
||||
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
|
||||
result.add(new FileSplit(path, length - bytesRemaining, splitSize, length, hosts, partitionValues));
|
||||
}
|
||||
if (bytesRemaining != 0L) {
|
||||
int location = getBlockIndex(blockLocations, length - bytesRemaining);
|
||||
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
|
||||
result.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, length, hosts, partitionValues));
|
||||
}
|
||||
|
||||
LOG.debug("Path {} includes {} splits.", path, result.size());
|
||||
return result;
|
||||
}
|
||||
|
||||
private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
|
||||
if (blkLocations == null || blkLocations.length == 0) {
|
||||
return -1;
|
||||
}
|
||||
for (int i = 0; i < blkLocations.length; ++i) {
|
||||
if (blkLocations[i].getOffset() <= offset
|
||||
&& offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
BlockLocation last = blkLocations[blkLocations.length - 1];
|
||||
long fileLength = last.getOffset() + last.getLength() - 1L;
|
||||
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,61 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.FileLoadScanNode;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileScanRangeParams;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface FileScanProviderIf {
|
||||
// Return parquet/orc/text, etc.
|
||||
TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException;
|
||||
|
||||
// Return S3/HDSF, etc.
|
||||
TFileType getLocationType() throws DdlException, MetaNotFoundException;
|
||||
|
||||
// return properties for S3/HDFS, etc.
|
||||
Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException;
|
||||
|
||||
List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException;
|
||||
|
||||
FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException;
|
||||
|
||||
void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, FederationBackendPolicy backendPolicy,
|
||||
List<TScanRangeLocations> scanRangeLocations) throws UserException;
|
||||
|
||||
void createScanRangeLocations(List<Expr> conjuncts, TFileScanRangeParams params,
|
||||
FederationBackendPolicy backendPolicy,
|
||||
List<TScanRangeLocations> scanRangeLocations) throws UserException;
|
||||
|
||||
int getInputSplitNum();
|
||||
|
||||
long getInputFileSize();
|
||||
|
||||
TableIf getTargetTable();
|
||||
}
|
||||
@ -17,7 +17,7 @@
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.planner.Split;
|
||||
import org.apache.doris.spi.Split;
|
||||
|
||||
import lombok.Data;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path;
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class FileSplit extends Split {
|
||||
public class FileSplit implements Split {
|
||||
protected Path path;
|
||||
protected long start;
|
||||
// length of this split, in bytes
|
||||
@ -34,6 +34,7 @@ public class FileSplit extends Split {
|
||||
// -1 means unset.
|
||||
// If the file length is not set, the file length will be fetched from the file system.
|
||||
protected long fileLength;
|
||||
protected String[] hosts;
|
||||
protected TableFormatType tableFormatType;
|
||||
// The values of partitions.
|
||||
// e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
|
||||
@ -46,15 +47,16 @@ public class FileSplit extends Split {
|
||||
this.start = start;
|
||||
this.length = length;
|
||||
this.fileLength = fileLength;
|
||||
this.hosts = hosts;
|
||||
this.hosts = hosts == null ? new String[0] : hosts;
|
||||
this.partitionValues = partitionValues;
|
||||
}
|
||||
|
||||
public String[] getHosts() {
|
||||
if (this.hosts == null) {
|
||||
return new String[]{};
|
||||
} else {
|
||||
return this.hosts;
|
||||
}
|
||||
return hosts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getInfo() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,34 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class HMSTableScanProvider extends QueryScanProvider {
|
||||
|
||||
public abstract String getMetaStoreUrl();
|
||||
|
||||
public abstract Table getRemoteHiveTable() throws DdlException, MetaNotFoundException;
|
||||
|
||||
public abstract Map<String, String> getTableProperties() throws MetaNotFoundException;
|
||||
}
|
||||
@ -17,32 +17,34 @@
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.HivePartition;
|
||||
import org.apache.doris.external.hive.util.HiveUtil;
|
||||
import org.apache.doris.fs.FileSystemFactory;
|
||||
import org.apache.doris.fs.RemoteFiles;
|
||||
import org.apache.doris.fs.remote.RemoteFileSystem;
|
||||
import org.apache.doris.planner.ColumnRange;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.planner.Split;
|
||||
import org.apache.doris.planner.Splitter;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileTextScanRangeParams;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -50,23 +52,52 @@ import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HiveSplitter implements Splitter {
|
||||
public class HiveScanNode extends FileQueryScanNode {
|
||||
private static final Logger LOG = LogManager.getLogger(HiveScanNode.class);
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HiveSplitter.class);
|
||||
public static final String PROP_FIELD_DELIMITER = "field.delim";
|
||||
public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
|
||||
public static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
|
||||
private HMSExternalTable hmsTable;
|
||||
private Map<String, ColumnRange> columnNameToRange;
|
||||
private int totalPartitionNum = 0;
|
||||
private int readPartitionNum = 0;
|
||||
private final HMSExternalTable hmsTable;
|
||||
|
||||
public HiveSplitter(HMSExternalTable hmsTable, Map<String, ColumnRange> columnNameToRange) {
|
||||
this.hmsTable = hmsTable;
|
||||
this.columnNameToRange = columnNameToRange;
|
||||
/**
|
||||
* * External file scan node for Query Hive table
|
||||
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
|
||||
* eg: s3 tvf
|
||||
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
|
||||
*/
|
||||
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
|
||||
super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv);
|
||||
hmsTable = (HMSExternalTable) desc.getTable();
|
||||
}
|
||||
|
||||
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
|
||||
StatisticalType statisticalType, boolean needCheckColumnPriv) {
|
||||
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
|
||||
hmsTable = (HMSExternalTable) desc.getTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Split> getSplits(List<Expr> exprs) throws UserException {
|
||||
protected void doInitialize() throws UserException {
|
||||
super.doInitialize();
|
||||
genSlotToSchemaIdMap();
|
||||
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
|
||||
if (inputFormat.contains("TextInputFormat")) {
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
if (!slot.getType().isScalarType()) {
|
||||
throw new UserException("For column `" + slot.getColumn().getName()
|
||||
+ "`, The column types ARRAY/MAP/STRUCT are not supported yet"
|
||||
+ " for text input format of Hive. ");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Split> getSplits() throws UserException {
|
||||
long start = System.currentTimeMillis();
|
||||
try {
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
@ -78,14 +109,7 @@ public class HiveSplitter implements Splitter {
|
||||
hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
|
||||
partitionColumnTypes);
|
||||
}
|
||||
Map<String, String> properties = hmsTable.getCatalog().getCatalogProperty().getProperties();
|
||||
boolean useSelfSplitter = true;
|
||||
if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
|
||||
&& properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false")) {
|
||||
LOG.debug("Using self splitter for hmsTable {}", hmsTable.getName());
|
||||
useSelfSplitter = false;
|
||||
}
|
||||
|
||||
boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter();
|
||||
List<Split> allFiles = Lists.newArrayList();
|
||||
if (hivePartitionValues != null) {
|
||||
// 2. prune partitions by expr
|
||||
@ -135,85 +159,88 @@ public class HiveSplitter implements Splitter {
|
||||
|
||||
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
|
||||
List<Split> allFiles, boolean useSelfSplitter) throws IOException {
|
||||
|
||||
for (HiveMetaStoreCache.FileCacheValue fileCacheValue :
|
||||
cache.getFilesByPartitions(partitions, useSelfSplitter)) {
|
||||
// This if branch is to support old splitter, will remove later.
|
||||
if (fileCacheValue.getSplits() != null) {
|
||||
allFiles.addAll(fileCacheValue.getSplits());
|
||||
}
|
||||
if (fileCacheValue.getFiles() != null) {
|
||||
boolean isSplittable = fileCacheValue.isSplittable();
|
||||
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
|
||||
allFiles.addAll(splitFile(status, isSplittable, fileCacheValue.getPartitionValues()));
|
||||
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
|
||||
status.getBlockLocations(), status.getLength(),
|
||||
isSplittable, fileCacheValue.getPartitionValues()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<Split> splitFile(HiveMetaStoreCache.HiveFileStatus status,
|
||||
boolean splittable, List<String> partitionValues) throws IOException {
|
||||
List<Split> result = Lists.newArrayList();
|
||||
if (!splittable) {
|
||||
LOG.debug("Path {} is not splittable.", status.getPath());
|
||||
BlockLocation block = status.getBlockLocations()[0];
|
||||
result.add(new FileSplit(status.getPath(), 0, status.getLength(),
|
||||
status.getLength(), block.getHosts(), partitionValues));
|
||||
return result;
|
||||
}
|
||||
long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
|
||||
if (splitSize <= 0) {
|
||||
splitSize = status.getBlockSize();
|
||||
}
|
||||
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
|
||||
splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
|
||||
BlockLocation[] blockLocations = status.getBlockLocations();
|
||||
long length = status.getLength();
|
||||
long bytesRemaining;
|
||||
for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D;
|
||||
bytesRemaining -= splitSize) {
|
||||
int location = getBlockIndex(blockLocations, length - bytesRemaining);
|
||||
result.add(new FileSplit(status.getPath(), length - bytesRemaining,
|
||||
splitSize, length, blockLocations[location].getHosts(), partitionValues));
|
||||
}
|
||||
if (bytesRemaining != 0L) {
|
||||
int location = getBlockIndex(blockLocations, length - bytesRemaining);
|
||||
result.add(new FileSplit(status.getPath(), length - bytesRemaining,
|
||||
bytesRemaining, length, blockLocations[location].getHosts(), partitionValues));
|
||||
}
|
||||
|
||||
LOG.debug("Path {} includes {} splits.", status.getPath(), result.size());
|
||||
return result;
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() {
|
||||
return hmsTable.getRemoteTable().getPartitionKeys()
|
||||
.stream().map(FieldSchema::getName).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public int getTotalPartitionNum() {
|
||||
return totalPartitionNum;
|
||||
@Override
|
||||
public TableIf getTargetTable() {
|
||||
return hmsTable;
|
||||
}
|
||||
|
||||
public int getReadPartitionNum() {
|
||||
return readPartitionNum;
|
||||
@Override
|
||||
protected TFileType getLocationType() throws UserException {
|
||||
String location = hmsTable.getRemoteTable().getSd().getLocation();
|
||||
return getTFileType(location).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
|
||||
}
|
||||
|
||||
// Get File Status by using FileSystem API.
|
||||
public static HiveMetaStoreCache.FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat,
|
||||
JobConf jobConf,
|
||||
List<String> partitionValues) throws UserException {
|
||||
HiveMetaStoreCache.FileCacheValue result = new HiveMetaStoreCache.FileCacheValue();
|
||||
result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf));
|
||||
RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
|
||||
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
|
||||
locatedFiles.locations().forEach(result::addFile);
|
||||
result.setPartitionValues(partitionValues);
|
||||
return result;
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws UserException {
|
||||
TFileFormatType type = null;
|
||||
String inputFormatName = hmsTable.getRemoteTable().getSd().getInputFormat();
|
||||
String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
|
||||
if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) {
|
||||
type = TFileFormatType.FORMAT_PARQUET;
|
||||
} else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
|
||||
type = TFileFormatType.FORMAT_ORC;
|
||||
} else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) {
|
||||
type = TFileFormatType.FORMAT_CSV_PLAIN;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
private static int getBlockIndex(BlockLocation[] blkLocations, long offset) {
|
||||
for (int i = 0; i < blkLocations.length; ++i) {
|
||||
if (blkLocations[i].getOffset() <= offset
|
||||
&& offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) {
|
||||
return i;
|
||||
@Override
|
||||
protected Map<String, String> getLocationProperties() throws UserException {
|
||||
return hmsTable.getCatalogProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TFileAttributes getFileAttributes() throws UserException {
|
||||
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
|
||||
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
|
||||
.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
|
||||
textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
|
||||
TFileAttributes fileAttributes = new TFileAttributes();
|
||||
fileAttributes.setTextParams(textParams);
|
||||
fileAttributes.setHeaderType("");
|
||||
return fileAttributes;
|
||||
}
|
||||
|
||||
// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
|
||||
private void genSlotToSchemaIdMap() {
|
||||
List<Column> baseSchema = desc.getTable().getBaseSchema();
|
||||
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
int idx = 0;
|
||||
for (Column col : baseSchema) {
|
||||
if (col.getName().equals(slot.getColumn().getName())) {
|
||||
columnNameToPosition.put(col.getName(), idx);
|
||||
break;
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
BlockLocation last = blkLocations[blkLocations.length - 1];
|
||||
long fileLength = last.getOffset() + last.getLength() - 1L;
|
||||
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
|
||||
params.setSlotNameToSchemaPos(columnNameToPosition);
|
||||
}
|
||||
}
|
||||
@ -1,139 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.ColumnRange;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileTextScanRangeParams;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A HiveScanProvider to get information for scan node.
|
||||
*/
|
||||
public class HiveScanProvider extends HMSTableScanProvider {
|
||||
private static final Logger LOG = LogManager.getLogger(HiveScanProvider.class);
|
||||
|
||||
private static final String PROP_FIELD_DELIMITER = "field.delim";
|
||||
private static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
|
||||
private static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
|
||||
protected HMSExternalTable hmsTable;
|
||||
protected final TupleDescriptor desc;
|
||||
protected Map<String, ColumnRange> columnNameToRange;
|
||||
|
||||
public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
|
||||
Map<String, ColumnRange> columnNameToRange) {
|
||||
this.hmsTable = hmsTable;
|
||||
this.desc = desc;
|
||||
this.columnNameToRange = columnNameToRange;
|
||||
this.splitter = new HiveSplitter(hmsTable, columnNameToRange);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableIf getTargetTable() {
|
||||
return hmsTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
|
||||
TFileFormatType type = null;
|
||||
String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
|
||||
String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
|
||||
if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) {
|
||||
type = TFileFormatType.FORMAT_PARQUET;
|
||||
} else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
|
||||
type = TFileFormatType.FORMAT_ORC;
|
||||
} else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) {
|
||||
type = TFileFormatType.FORMAT_CSV_PLAIN;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
|
||||
String location = hmsTable.getRemoteTable().getSd().getLocation();
|
||||
return getTFileType(location).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetaStoreUrl() {
|
||||
return hmsTable.getMetastoreUri();
|
||||
}
|
||||
|
||||
public int getTotalPartitionNum() {
|
||||
return ((HiveSplitter) splitter).getTotalPartitionNum();
|
||||
}
|
||||
|
||||
public int getReadPartitionNum() {
|
||||
return ((HiveSplitter) splitter).getReadPartitionNum();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException {
|
||||
return hmsTable.getRemoteTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getTableProperties() throws MetaNotFoundException {
|
||||
// TODO: implement it when we really properties from remote table.
|
||||
return Maps.newHashMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
|
||||
return hmsTable.getCatalogProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
|
||||
return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileAttributes getFileAttributes() throws UserException {
|
||||
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
|
||||
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
|
||||
.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
|
||||
textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
|
||||
TFileAttributes fileAttributes = new TFileAttributes();
|
||||
fileAttributes.setTextParams(textParams);
|
||||
fileAttributes.setHeaderType("");
|
||||
return fileAttributes;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,34 +18,31 @@
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.planner.ColumnRange;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A file scan provider for hudi.
|
||||
* HudiProvier is extended with hive since they both use input format interface to get the split.
|
||||
*/
|
||||
public class HudiScanProvider extends HiveScanProvider {
|
||||
|
||||
public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
|
||||
Map<String, ColumnRange> columnNameToRange) {
|
||||
super(hmsTable, desc, columnNameToRange);
|
||||
public class HudiScanNode extends HiveScanNode {
|
||||
/**
|
||||
* External file scan node for Query Hudi table
|
||||
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
|
||||
* eg: s3 tvf
|
||||
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
|
||||
*/
|
||||
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
|
||||
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws DdlException {
|
||||
public TFileFormatType getFileFormatType() {
|
||||
return TFileFormatType.FORMAT_PARQUET;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
|
||||
public List<String> getPathPartitionKeys() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
@ -1,155 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.TableSnapshot;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.external.iceberg.util.IcebergUtils;
|
||||
import org.apache.doris.planner.Split;
|
||||
import org.apache.doris.planner.Splitter;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergDeleteFileFilter;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergSource;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergSplit;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.iceberg.BaseTable;
|
||||
import org.apache.iceberg.DeleteFile;
|
||||
import org.apache.iceberg.FileContent;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.HistoryEntry;
|
||||
import org.apache.iceberg.MetadataColumns;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.iceberg.exceptions.NotFoundException;
|
||||
import org.apache.iceberg.expressions.Expression;
|
||||
import org.apache.iceberg.types.Conversions;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class IcebergSplitter implements Splitter {
|
||||
private static final Logger LOG = LogManager.getLogger(IcebergSplitter.class);
|
||||
|
||||
private final IcebergSource icebergSource;
|
||||
private final Analyzer analyzer;
|
||||
|
||||
public IcebergSplitter(IcebergSource icebergSource, Analyzer analyzer) {
|
||||
this.icebergSource = icebergSource;
|
||||
this.analyzer = analyzer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Split> getSplits(List<Expr> exprs) throws UserException {
|
||||
List<Expression> expressions = new ArrayList<>();
|
||||
org.apache.iceberg.Table table = icebergSource.getIcebergTable();
|
||||
for (Expr conjunct : exprs) {
|
||||
Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
|
||||
if (expression != null) {
|
||||
expressions.add(expression);
|
||||
}
|
||||
}
|
||||
TableScan scan = table.newScan();
|
||||
TableSnapshot tableSnapshot = icebergSource.getDesc().getRef().getTableSnapshot();
|
||||
if (tableSnapshot != null) {
|
||||
TableSnapshot.VersionType type = tableSnapshot.getType();
|
||||
try {
|
||||
if (type == TableSnapshot.VersionType.VERSION) {
|
||||
scan = scan.useSnapshot(tableSnapshot.getVersion());
|
||||
} else {
|
||||
long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
|
||||
scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new UserException(e);
|
||||
}
|
||||
}
|
||||
for (Expression predicate : expressions) {
|
||||
scan = scan.filter(predicate);
|
||||
}
|
||||
List<Split> splits = new ArrayList<>();
|
||||
int formatVersion = ((BaseTable) table).operations().current().formatVersion();
|
||||
for (FileScanTask task : scan.planFiles()) {
|
||||
long fileSize = task.file().fileSizeInBytes();
|
||||
for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
|
||||
String dataFilePath = splitTask.file().path().toString();
|
||||
IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(),
|
||||
splitTask.length(), fileSize, new String[0]);
|
||||
split.setFormatVersion(formatVersion);
|
||||
if (formatVersion >= IcebergScanProvider.MIN_DELETE_FILE_SUPPORT_VERSION) {
|
||||
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
|
||||
}
|
||||
split.setTableFormatType(TableFormatType.ICEBERG);
|
||||
split.setAnalyzer(analyzer);
|
||||
splits.add(split);
|
||||
}
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
public static long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long asOfTimestamp) {
|
||||
// find history at or before asOfTimestamp
|
||||
HistoryEntry latestHistory = null;
|
||||
for (HistoryEntry entry : historyEntries) {
|
||||
if (entry.timestampMillis() <= asOfTimestamp) {
|
||||
if (latestHistory == null) {
|
||||
latestHistory = entry;
|
||||
continue;
|
||||
}
|
||||
if (entry.timestampMillis() > latestHistory.timestampMillis()) {
|
||||
latestHistory = entry;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (latestHistory == null) {
|
||||
throw new NotFoundException("No version history at or before "
|
||||
+ Instant.ofEpochMilli(asOfTimestamp));
|
||||
}
|
||||
return latestHistory.snapshotId();
|
||||
}
|
||||
|
||||
private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) {
|
||||
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
|
||||
for (DeleteFile delete : spitTask.deletes()) {
|
||||
if (delete.content() == FileContent.POSITION_DELETES) {
|
||||
ByteBuffer lowerBoundBytes = delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
|
||||
Optional<Long> positionLowerBound = Optional.ofNullable(lowerBoundBytes)
|
||||
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
|
||||
ByteBuffer upperBoundBytes = delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
|
||||
Optional<Long> positionUpperBound = Optional.ofNullable(upperBoundBytes)
|
||||
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
|
||||
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
|
||||
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L)));
|
||||
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
|
||||
// todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
|
||||
// delete.equalityFieldIds()));
|
||||
throw new IllegalStateException("Don't support equality delete file");
|
||||
} else {
|
||||
throw new IllegalStateException("Unknown delete content: " + delete.content());
|
||||
}
|
||||
}
|
||||
return filters;
|
||||
}
|
||||
}
|
||||
@ -18,7 +18,6 @@
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.ImportColumnDesc;
|
||||
import org.apache.doris.analysis.IntLiteral;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
@ -55,7 +54,7 @@ import com.google.common.collect.Maps;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class LoadScanProvider implements FileScanProviderIf {
|
||||
public class LoadScanProvider {
|
||||
|
||||
private FileGroupInfo fileGroupInfo;
|
||||
private TupleDescriptor destTupleDesc;
|
||||
@ -65,27 +64,22 @@ public class LoadScanProvider implements FileScanProviderIf {
|
||||
this.destTupleDesc = destTupleDesc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException {
|
||||
FileLoadScanNode.ParamCreateContext ctx = new FileLoadScanNode.ParamCreateContext();
|
||||
ctx.destTupleDescriptor = destTupleDesc;
|
||||
@ -138,7 +132,6 @@ public class LoadScanProvider implements FileScanProviderIf {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
|
||||
FederationBackendPolicy backendPolicy,
|
||||
List<TScanRangeLocations> scanRangeLocations) throws UserException {
|
||||
@ -147,18 +140,10 @@ public class LoadScanProvider implements FileScanProviderIf {
|
||||
fileGroupInfo.createScanRangeLocations(context, backendPolicy, scanRangeLocations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createScanRangeLocations(List<Expr> conjuncts, TFileScanRangeParams params,
|
||||
FederationBackendPolicy backendPolicy,
|
||||
List<TScanRangeLocations> scanRangeLocations) throws UserException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInputSplitNum() {
|
||||
return fileGroupInfo.getFileStatuses().size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getInputFileSize() {
|
||||
long res = 0;
|
||||
for (TBrokerFileStatus fileStatus : fileGroupInfo.getFileStatuses()) {
|
||||
@ -250,7 +235,6 @@ public class LoadScanProvider implements FileScanProviderIf {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableIf getTargetTable() {
|
||||
return fileGroupInfo.getTargetTable();
|
||||
}
|
||||
|
||||
@ -1,225 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.planner.FileLoadScanNode;
|
||||
import org.apache.doris.planner.Split;
|
||||
import org.apache.doris.planner.Splitter;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergSplit;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TExternalScanRange;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TFileScanRange;
|
||||
import org.apache.doris.thrift.TFileScanRangeParams;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.THdfsParams;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TScanRange;
|
||||
import org.apache.doris.thrift.TScanRangeLocation;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public abstract class QueryScanProvider implements FileScanProviderIf {
|
||||
public static final Logger LOG = LogManager.getLogger(QueryScanProvider.class);
|
||||
private int inputSplitNum = 0;
|
||||
private long inputFileSize = 0;
|
||||
protected Splitter splitter;
|
||||
|
||||
public abstract TFileAttributes getFileAttributes() throws UserException;
|
||||
|
||||
@Override
|
||||
public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
|
||||
FederationBackendPolicy backendPolicy,
|
||||
List<TScanRangeLocations> scanRangeLocations) throws UserException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createScanRangeLocations(List<Expr> conjuncts, TFileScanRangeParams params,
|
||||
FederationBackendPolicy backendPolicy,
|
||||
List<TScanRangeLocations> scanRangeLocations) throws UserException {
|
||||
long start = System.currentTimeMillis();
|
||||
List<Split> inputSplits = splitter.getSplits(conjuncts);
|
||||
this.inputSplitNum = inputSplits.size();
|
||||
if (inputSplits.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
FileSplit inputSplit = (FileSplit) inputSplits.get(0);
|
||||
TFileType locationType = getLocationType();
|
||||
params.setFileType(locationType);
|
||||
TFileFormatType fileFormatType = getFileFormatType();
|
||||
params.setFormatType(getFileFormatType());
|
||||
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || fileFormatType == TFileFormatType.FORMAT_JSON) {
|
||||
params.setFileAttributes(getFileAttributes());
|
||||
}
|
||||
|
||||
// set hdfs params for hdfs file type.
|
||||
Map<String, String> locationProperties = getLocationProperties();
|
||||
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
|
||||
String fsName = "";
|
||||
if (this instanceof TVFScanProvider) {
|
||||
fsName = ((TVFScanProvider) this).getFsName();
|
||||
} else {
|
||||
String fullPath = inputSplit.getPath().toUri().toString();
|
||||
String filePath = inputSplit.getPath().toUri().getPath();
|
||||
// eg:
|
||||
// hdfs://namenode
|
||||
// s3://buckets
|
||||
fsName = fullPath.replace(filePath, "");
|
||||
}
|
||||
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
|
||||
tHdfsParams.setFsName(fsName);
|
||||
params.setHdfsParams(tHdfsParams);
|
||||
|
||||
if (locationType == TFileType.FILE_BROKER) {
|
||||
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
|
||||
if (broker == null) {
|
||||
throw new UserException("No alive broker.");
|
||||
}
|
||||
params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
|
||||
}
|
||||
} else if (locationType == TFileType.FILE_S3) {
|
||||
params.setProperties(locationProperties);
|
||||
}
|
||||
|
||||
List<String> pathPartitionKeys = getPathPartitionKeys();
|
||||
for (Split split : inputSplits) {
|
||||
TScanRangeLocations curLocations = newLocations(params, backendPolicy);
|
||||
FileSplit fileSplit = (FileSplit) split;
|
||||
|
||||
// If fileSplit has partition values, use the values collected from hive partitions.
|
||||
// Otherwise, use the values in file path.
|
||||
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
|
||||
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false)
|
||||
: fileSplit.getPartitionValues();
|
||||
|
||||
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
|
||||
// external data lake table
|
||||
if (fileSplit instanceof IcebergSplit) {
|
||||
IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit);
|
||||
}
|
||||
|
||||
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
|
||||
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
|
||||
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
|
||||
fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts()));
|
||||
scanRangeLocations.add(curLocations);
|
||||
this.inputFileSize += fileSplit.getLength();
|
||||
}
|
||||
LOG.debug("create #{} ScanRangeLocations cost: {} ms",
|
||||
scanRangeLocations.size(), (System.currentTimeMillis() - start));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInputSplitNum() {
|
||||
return this.inputSplitNum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getInputFileSize() {
|
||||
return this.inputFileSize;
|
||||
}
|
||||
|
||||
private TScanRangeLocations newLocations(TFileScanRangeParams params, FederationBackendPolicy backendPolicy) {
|
||||
// Generate on file scan range
|
||||
TFileScanRange fileScanRange = new TFileScanRange();
|
||||
fileScanRange.setParams(params);
|
||||
|
||||
// Scan range
|
||||
TExternalScanRange externalScanRange = new TExternalScanRange();
|
||||
externalScanRange.setFileScanRange(fileScanRange);
|
||||
TScanRange scanRange = new TScanRange();
|
||||
scanRange.setExtScanRange(externalScanRange);
|
||||
|
||||
// Locations
|
||||
TScanRangeLocations locations = new TScanRangeLocations();
|
||||
locations.setScanRange(scanRange);
|
||||
|
||||
TScanRangeLocation location = new TScanRangeLocation();
|
||||
Backend selectedBackend = backendPolicy.getNextBe();
|
||||
location.setBackendId(selectedBackend.getId());
|
||||
location.setServer(new TNetworkAddress(selectedBackend.getIp(), selectedBackend.getBePort()));
|
||||
locations.addToLocations(location);
|
||||
|
||||
return locations;
|
||||
}
|
||||
|
||||
private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
|
||||
List<String> columnsFromPathKeys)
|
||||
throws DdlException, MetaNotFoundException {
|
||||
TFileRangeDesc rangeDesc = new TFileRangeDesc();
|
||||
rangeDesc.setStartOffset(fileSplit.getStart());
|
||||
rangeDesc.setSize(fileSplit.getLength());
|
||||
// fileSize only be used when format is orc or parquet and TFileType is broker
|
||||
// When TFileType is other type, it is not necessary
|
||||
rangeDesc.setFileSize(fileSplit.getFileLength());
|
||||
rangeDesc.setColumnsFromPath(columnsFromPath);
|
||||
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
|
||||
|
||||
if (getLocationType() == TFileType.FILE_HDFS) {
|
||||
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
|
||||
} else if (getLocationType() == TFileType.FILE_S3 || getLocationType() == TFileType.FILE_BROKER) {
|
||||
// need full path
|
||||
rangeDesc.setPath(fileSplit.getPath().toString());
|
||||
}
|
||||
return rangeDesc;
|
||||
}
|
||||
|
||||
protected static Optional<TFileType> getTFileType(String location) {
|
||||
if (location != null && !location.isEmpty()) {
|
||||
if (FeConstants.isObjStorage(location)) {
|
||||
return Optional.of(TFileType.FILE_S3);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
|
||||
return Optional.of(TFileType.FILE_HDFS);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
|
||||
return Optional.of(TFileType.FILE_LOCAL);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
133
fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
vendored
Normal file
133
fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
vendored
Normal file
@ -0,0 +1,133 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.FunctionGenTable;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
|
||||
import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TVFScanNode extends FileQueryScanNode {
|
||||
private static final Logger LOG = LogManager.getLogger(TVFScanNode.class);
|
||||
|
||||
private final ExternalFileTableValuedFunction tableValuedFunction;
|
||||
private final FunctionGenTable table;
|
||||
|
||||
/**
|
||||
* External file scan node for table value function
|
||||
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
|
||||
* eg: s3 tvf
|
||||
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
|
||||
*/
|
||||
public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
|
||||
super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, needCheckColumnPriv);
|
||||
table = (FunctionGenTable) this.desc.getTable();
|
||||
tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInitialize() throws UserException {
|
||||
Preconditions.checkNotNull(desc);
|
||||
computeColumnFilter();
|
||||
initBackendPolicy();
|
||||
initSchemaParams();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getFsName(FileSplit split) {
|
||||
return tableValuedFunction.getFsName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileAttributes getFileAttributes() throws UserException {
|
||||
return tableValuedFunction.getFileAttributes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
|
||||
return tableValuedFunction.getTFileFormatType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
|
||||
return tableValuedFunction.getTFileType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
|
||||
return tableValuedFunction.getLocationProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableIf getTargetTable() {
|
||||
return table;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Split> getSplits() throws UserException {
|
||||
List<Split> splits = Lists.newArrayList();
|
||||
List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses();
|
||||
for (TBrokerFileStatus fileStatus : fileStatuses) {
|
||||
Path path = new Path(fileStatus.getPath());
|
||||
try {
|
||||
splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(),
|
||||
fileStatus.isSplitable, null));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("get file split failed for TVF: {}", path, e);
|
||||
throw new UserException(e);
|
||||
}
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
private void addFileSplits(Path path, long fileSize, long splitSize, List<Split> splits) {
|
||||
long bytesRemaining;
|
||||
for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D;
|
||||
bytesRemaining -= splitSize) {
|
||||
splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null));
|
||||
}
|
||||
if (bytesRemaining != 0L) {
|
||||
splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,85 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.FunctionGenTable;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TVFScanProvider extends QueryScanProvider {
|
||||
private FunctionGenTable tvfTable;
|
||||
private final TupleDescriptor desc;
|
||||
private ExternalFileTableValuedFunction tableValuedFunction;
|
||||
|
||||
public TVFScanProvider(FunctionGenTable tvfTable, TupleDescriptor desc,
|
||||
ExternalFileTableValuedFunction tableValuedFunction) {
|
||||
this.tvfTable = tvfTable;
|
||||
this.desc = desc;
|
||||
this.tableValuedFunction = tableValuedFunction;
|
||||
this.splitter = new TVFSplitter(tableValuedFunction);
|
||||
}
|
||||
|
||||
public String getFsName() {
|
||||
return tableValuedFunction.getFsName();
|
||||
}
|
||||
|
||||
// =========== implement abstract methods of QueryScanProvider =================
|
||||
@Override
|
||||
public TFileAttributes getFileAttributes() throws UserException {
|
||||
return tableValuedFunction.getFileAttributes();
|
||||
}
|
||||
|
||||
|
||||
// =========== implement interface methods of FileScanProviderIf ================
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
|
||||
return tableValuedFunction.getTFileFormatType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
|
||||
return tableValuedFunction.getTFileType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
|
||||
return tableValuedFunction.getLocationProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableIf getTargetTable() {
|
||||
return tvfTable;
|
||||
}
|
||||
}
|
||||
@ -1,79 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.Split;
|
||||
import org.apache.doris.planner.Splitter;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
|
||||
import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class TVFSplitter implements Splitter {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(TVFSplitter.class);
|
||||
|
||||
private ExternalFileTableValuedFunction tableValuedFunction;
|
||||
|
||||
public TVFSplitter(ExternalFileTableValuedFunction tableValuedFunction) {
|
||||
this.tableValuedFunction = tableValuedFunction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Split> getSplits(List<Expr> exprs) throws UserException {
|
||||
List<Split> splits = Lists.newArrayList();
|
||||
List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses();
|
||||
for (TBrokerFileStatus fileStatus : fileStatuses) {
|
||||
long fileLength = fileStatus.getSize();
|
||||
Path path = new Path(fileStatus.getPath());
|
||||
if (fileStatus.isSplitable) {
|
||||
long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
|
||||
if (splitSize <= 0) {
|
||||
splitSize = fileStatus.getBlockSize();
|
||||
}
|
||||
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
|
||||
splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
|
||||
addFileSplits(path, fileLength, splitSize, splits);
|
||||
} else {
|
||||
Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0], null);
|
||||
splits.add(split);
|
||||
}
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
private void addFileSplits(Path path, long fileSize, long splitSize, List<Split> splits) {
|
||||
long bytesRemaining;
|
||||
for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D;
|
||||
bytesRemaining -= splitSize) {
|
||||
splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null));
|
||||
}
|
||||
if (bytesRemaining != 0L) {
|
||||
splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -26,8 +26,9 @@ import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.planner.ColumnRange;
|
||||
import org.apache.doris.planner.external.HiveScanProvider;
|
||||
import org.apache.doris.planner.external.HiveScanNode;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileTextScanRangeParams;
|
||||
|
||||
import org.apache.iceberg.TableProperties;
|
||||
|
||||
@ -36,15 +37,14 @@ import java.util.Map;
|
||||
public class IcebergHMSSource implements IcebergSource {
|
||||
|
||||
private final HMSExternalTable hmsTable;
|
||||
private final HiveScanProvider hiveScanProvider;
|
||||
|
||||
private final TupleDescriptor desc;
|
||||
private final Map<String, ColumnRange> columnNameToRange;
|
||||
|
||||
public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
|
||||
Map<String, ColumnRange> columnNameToRange) {
|
||||
this.hiveScanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange);
|
||||
this.hmsTable = hmsTable;
|
||||
this.desc = desc;
|
||||
this.columnNameToRange = columnNameToRange;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -54,8 +54,8 @@ public class IcebergHMSSource implements IcebergSource {
|
||||
|
||||
@Override
|
||||
public String getFileFormat() throws DdlException, MetaNotFoundException {
|
||||
return hiveScanProvider.getRemoteHiveTable().getParameters()
|
||||
.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
|
||||
return hmsTable.getRemoteTable().getParameters()
|
||||
.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
|
||||
}
|
||||
|
||||
public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
|
||||
@ -64,12 +64,19 @@ public class IcebergHMSSource implements IcebergSource {
|
||||
|
||||
@Override
|
||||
public TableIf getTargetTable() {
|
||||
return hiveScanProvider.getTargetTable();
|
||||
return hmsTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileAttributes getFileAttributes() throws UserException {
|
||||
return hiveScanProvider.getFileAttributes();
|
||||
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
|
||||
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
|
||||
.getOrDefault(HiveScanNode.PROP_FIELD_DELIMITER, HiveScanNode.DEFAULT_FIELD_DELIMITER));
|
||||
textParams.setLineDelimiter(HiveScanNode.DEFAULT_LINE_DELIMITER);
|
||||
TFileAttributes fileAttributes = new TFileAttributes();
|
||||
fileAttributes.setTextParams(textParams);
|
||||
fileAttributes.setHeaderType("");
|
||||
return fileAttributes;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
286
fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
vendored
Normal file
286
fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
vendored
Normal file
@ -0,0 +1,286 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external.iceberg;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.TableSnapshot;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.catalog.external.IcebergExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
|
||||
import org.apache.doris.external.iceberg.util.IcebergUtils;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.external.FileQueryScanNode;
|
||||
import org.apache.doris.planner.external.TableFormatType;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
|
||||
import org.apache.doris.thrift.TIcebergFileDesc;
|
||||
import org.apache.doris.thrift.TTableFormatFileDesc;
|
||||
|
||||
import avro.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.iceberg.BaseTable;
|
||||
import org.apache.iceberg.DeleteFile;
|
||||
import org.apache.iceberg.FileContent;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.HistoryEntry;
|
||||
import org.apache.iceberg.MetadataColumns;
|
||||
import org.apache.iceberg.PartitionField;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.iceberg.exceptions.NotFoundException;
|
||||
import org.apache.iceberg.expressions.Expression;
|
||||
import org.apache.iceberg.types.Conversions;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class IcebergScanNode extends FileQueryScanNode {
|
||||
|
||||
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
|
||||
|
||||
private IcebergSource source;
|
||||
|
||||
/**
|
||||
* External file scan node for Query iceberg table
|
||||
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
|
||||
* eg: s3 tvf
|
||||
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
|
||||
*/
|
||||
public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
|
||||
super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInitialize() throws UserException {
|
||||
ExternalTable table = (ExternalTable) desc.getTable();
|
||||
if (table.isView()) {
|
||||
throw new AnalysisException(
|
||||
String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName()));
|
||||
}
|
||||
computeColumnFilter();
|
||||
initBackendPolicy();
|
||||
if (table instanceof HMSExternalTable) {
|
||||
source = new IcebergHMSSource((HMSExternalTable) table, desc, columnNameToRange);
|
||||
} else if (table instanceof IcebergExternalTable) {
|
||||
String catalogType = ((IcebergExternalTable) table).getIcebergCatalogType();
|
||||
switch (catalogType) {
|
||||
case IcebergExternalCatalog.ICEBERG_HMS:
|
||||
case IcebergExternalCatalog.ICEBERG_REST:
|
||||
case IcebergExternalCatalog.ICEBERG_DLF:
|
||||
case IcebergExternalCatalog.ICEBERG_GLUE:
|
||||
source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange);
|
||||
break;
|
||||
default:
|
||||
throw new UserException("Unknown iceberg catalog type: " + catalogType);
|
||||
}
|
||||
}
|
||||
Preconditions.checkNotNull(source);
|
||||
initSchemaParams();
|
||||
}
|
||||
|
||||
public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) {
|
||||
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
|
||||
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
|
||||
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
|
||||
int formatVersion = icebergSplit.getFormatVersion();
|
||||
fileDesc.setFormatVersion(formatVersion);
|
||||
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
|
||||
fileDesc.setContent(FileContent.DATA.id());
|
||||
} else {
|
||||
for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
|
||||
TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
|
||||
deleteFileDesc.setPath(filter.getDeleteFilePath());
|
||||
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
|
||||
fileDesc.setContent(FileContent.POSITION_DELETES.id());
|
||||
IcebergDeleteFileFilter.PositionDelete positionDelete =
|
||||
(IcebergDeleteFileFilter.PositionDelete) filter;
|
||||
OptionalLong lowerBound = positionDelete.getPositionLowerBound();
|
||||
OptionalLong upperBound = positionDelete.getPositionUpperBound();
|
||||
if (lowerBound.isPresent()) {
|
||||
deleteFileDesc.setPositionLowerBound(lowerBound.getAsLong());
|
||||
}
|
||||
if (upperBound.isPresent()) {
|
||||
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
|
||||
}
|
||||
} else {
|
||||
fileDesc.setContent(FileContent.EQUALITY_DELETES.id());
|
||||
IcebergDeleteFileFilter.EqualityDelete equalityDelete =
|
||||
(IcebergDeleteFileFilter.EqualityDelete) filter;
|
||||
deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
|
||||
}
|
||||
fileDesc.addToDeleteFiles(deleteFileDesc);
|
||||
}
|
||||
}
|
||||
tableFormatFileDesc.setIcebergParams(fileDesc);
|
||||
rangeDesc.setTableFormatParams(tableFormatFileDesc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Split> getSplits() throws UserException {
|
||||
List<Expression> expressions = new ArrayList<>();
|
||||
org.apache.iceberg.Table table = source.getIcebergTable();
|
||||
for (Expr conjunct : conjuncts) {
|
||||
Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
|
||||
if (expression != null) {
|
||||
expressions.add(expression);
|
||||
}
|
||||
}
|
||||
TableScan scan = table.newScan();
|
||||
TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot();
|
||||
if (tableSnapshot != null) {
|
||||
TableSnapshot.VersionType type = tableSnapshot.getType();
|
||||
try {
|
||||
if (type == TableSnapshot.VersionType.VERSION) {
|
||||
scan = scan.useSnapshot(tableSnapshot.getVersion());
|
||||
} else {
|
||||
long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
|
||||
scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new UserException(e);
|
||||
}
|
||||
}
|
||||
for (Expression predicate : expressions) {
|
||||
scan = scan.filter(predicate);
|
||||
}
|
||||
List<Split> splits = new ArrayList<>();
|
||||
int formatVersion = ((BaseTable) table).operations().current().formatVersion();
|
||||
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
|
||||
long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE);
|
||||
for (FileScanTask task : scan.planFiles()) {
|
||||
long fileSize = task.file().fileSizeInBytes();
|
||||
for (FileScanTask splitTask : task.split(splitSize)) {
|
||||
String dataFilePath = splitTask.file().path().toString();
|
||||
IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(),
|
||||
splitTask.length(), fileSize, new String[0]);
|
||||
split.setFormatVersion(formatVersion);
|
||||
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
|
||||
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
|
||||
}
|
||||
split.setTableFormatType(TableFormatType.ICEBERG);
|
||||
splits.add(split);
|
||||
}
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
private long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long asOfTimestamp) {
|
||||
// find history at or before asOfTimestamp
|
||||
HistoryEntry latestHistory = null;
|
||||
for (HistoryEntry entry : historyEntries) {
|
||||
if (entry.timestampMillis() <= asOfTimestamp) {
|
||||
if (latestHistory == null) {
|
||||
latestHistory = entry;
|
||||
continue;
|
||||
}
|
||||
if (entry.timestampMillis() > latestHistory.timestampMillis()) {
|
||||
latestHistory = entry;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (latestHistory == null) {
|
||||
throw new NotFoundException("No version history at or before "
|
||||
+ Instant.ofEpochMilli(asOfTimestamp));
|
||||
}
|
||||
return latestHistory.snapshotId();
|
||||
}
|
||||
|
||||
private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) {
|
||||
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
|
||||
for (DeleteFile delete : spitTask.deletes()) {
|
||||
if (delete.content() == FileContent.POSITION_DELETES) {
|
||||
ByteBuffer lowerBoundBytes = delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
|
||||
Optional<Long> positionLowerBound = Optional.ofNullable(lowerBoundBytes)
|
||||
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
|
||||
ByteBuffer upperBoundBytes = delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
|
||||
Optional<Long> positionUpperBound = Optional.ofNullable(upperBoundBytes)
|
||||
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
|
||||
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
|
||||
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L)));
|
||||
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
|
||||
// todo: filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
|
||||
throw new IllegalStateException("Don't support equality delete file");
|
||||
} else {
|
||||
throw new IllegalStateException("Unknown delete content: " + delete.content());
|
||||
}
|
||||
}
|
||||
return filters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType() throws UserException {
|
||||
Table icebergTable = source.getIcebergTable();
|
||||
String location = icebergTable.location();
|
||||
return getTFileType(location).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for iceberg table " + icebergTable.name()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws UserException {
|
||||
TFileFormatType type;
|
||||
String icebergFormat = source.getFileFormat();
|
||||
if (icebergFormat.equalsIgnoreCase("parquet")) {
|
||||
type = TFileFormatType.FORMAT_PARQUET;
|
||||
} else if (icebergFormat.equalsIgnoreCase("orc")) {
|
||||
type = TFileFormatType.FORMAT_ORC;
|
||||
} else {
|
||||
throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat));
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileAttributes getFileAttributes() throws UserException {
|
||||
return source.getFileAttributes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws UserException {
|
||||
return source.getIcebergTable().spec().fields().stream().map(PartitionField::name)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableIf getTargetTable() {
|
||||
return source.getTargetTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLocationProperties() throws UserException {
|
||||
return source.getCatalog().getProperties();
|
||||
}
|
||||
}
|
||||
@ -1,137 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner.external.iceberg;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.external.IcebergSplitter;
|
||||
import org.apache.doris.planner.external.QueryScanProvider;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
|
||||
import org.apache.doris.thrift.TIcebergFileDesc;
|
||||
import org.apache.doris.thrift.TTableFormatFileDesc;
|
||||
|
||||
import org.apache.iceberg.FileContent;
|
||||
import org.apache.iceberg.PartitionField;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A file scan provider for iceberg.
|
||||
*/
|
||||
public class IcebergScanProvider extends QueryScanProvider {
|
||||
|
||||
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
|
||||
private final Analyzer analyzer;
|
||||
private final IcebergSource icebergSource;
|
||||
|
||||
public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer) {
|
||||
this.icebergSource = icebergSource;
|
||||
this.analyzer = analyzer;
|
||||
this.splitter = new IcebergSplitter(icebergSource, analyzer);
|
||||
}
|
||||
|
||||
public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) {
|
||||
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
|
||||
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
|
||||
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
|
||||
int formatVersion = icebergSplit.getFormatVersion();
|
||||
fileDesc.setFormatVersion(formatVersion);
|
||||
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
|
||||
fileDesc.setContent(FileContent.DATA.id());
|
||||
} else {
|
||||
for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
|
||||
TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
|
||||
deleteFileDesc.setPath(filter.getDeleteFilePath());
|
||||
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
|
||||
fileDesc.setContent(FileContent.POSITION_DELETES.id());
|
||||
IcebergDeleteFileFilter.PositionDelete positionDelete =
|
||||
(IcebergDeleteFileFilter.PositionDelete) filter;
|
||||
OptionalLong lowerBound = positionDelete.getPositionLowerBound();
|
||||
OptionalLong upperBound = positionDelete.getPositionUpperBound();
|
||||
if (lowerBound.isPresent()) {
|
||||
deleteFileDesc.setPositionLowerBound(lowerBound.getAsLong());
|
||||
}
|
||||
if (upperBound.isPresent()) {
|
||||
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
|
||||
}
|
||||
} else {
|
||||
fileDesc.setContent(FileContent.EQUALITY_DELETES.id());
|
||||
IcebergDeleteFileFilter.EqualityDelete equalityDelete =
|
||||
(IcebergDeleteFileFilter.EqualityDelete) filter;
|
||||
deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
|
||||
}
|
||||
fileDesc.addToDeleteFiles(deleteFileDesc);
|
||||
}
|
||||
}
|
||||
tableFormatFileDesc.setIcebergParams(fileDesc);
|
||||
rangeDesc.setTableFormatParams(tableFormatFileDesc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
|
||||
org.apache.iceberg.Table table = icebergSource.getIcebergTable();
|
||||
String location = table.location();
|
||||
return getTFileType(location).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for iceberg table " + table.name()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
|
||||
return icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
|
||||
TFileFormatType type;
|
||||
String icebergFormat = icebergSource.getFileFormat();
|
||||
if (icebergFormat.equalsIgnoreCase("parquet")) {
|
||||
type = TFileFormatType.FORMAT_PARQUET;
|
||||
} else if (icebergFormat.equalsIgnoreCase("orc")) {
|
||||
type = TFileFormatType.FORMAT_ORC;
|
||||
} else {
|
||||
throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat));
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
|
||||
return icebergSource.getCatalog().getProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableIf getTargetTable() {
|
||||
return icebergSource.getTargetTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileAttributes getFileAttributes() throws UserException {
|
||||
return icebergSource.getFileAttributes();
|
||||
}
|
||||
}
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.planner.external.iceberg;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.planner.external.FileSplit;
|
||||
|
||||
import lombok.Data;
|
||||
@ -31,8 +30,6 @@ public class IcebergSplit extends FileSplit {
|
||||
super(file, start, length, fileLength, hosts, null);
|
||||
}
|
||||
|
||||
private Analyzer analyzer;
|
||||
private String dataFilePath;
|
||||
private Integer formatVersion;
|
||||
private List<IcebergDeleteFileFilter> deleteFileFilters;
|
||||
}
|
||||
|
||||
@ -15,17 +15,15 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
package org.apache.doris.spi;
|
||||
|
||||
import lombok.Data;
|
||||
/**
|
||||
* Split interface. e.g. Tablet for Olap Table.
|
||||
*/
|
||||
public interface Split {
|
||||
|
||||
@Data
|
||||
public abstract class Split {
|
||||
protected String[] hosts;
|
||||
String[] getHosts();
|
||||
|
||||
public Split() {}
|
||||
Object getInfo();
|
||||
|
||||
public Split(String[] hosts) {
|
||||
this.hosts = hosts;
|
||||
}
|
||||
}
|
||||
@ -31,6 +31,8 @@ public enum StatisticalType {
|
||||
HASH_JOIN_NODE,
|
||||
HIVE_SCAN_NODE,
|
||||
ICEBERG_SCAN_NODE,
|
||||
HUDI_SCAN_NODE,
|
||||
TVF_SCAN_NODE,
|
||||
INTERSECT_NODE,
|
||||
LOAD_SCAN_NODE,
|
||||
MYSQL_SCAN_NODE,
|
||||
|
||||
@ -37,7 +37,7 @@ import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.planner.external.FileQueryScanNode;
|
||||
import org.apache.doris.planner.external.TVFScanNode;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
|
||||
import org.apache.doris.proto.Types.PScalarType;
|
||||
@ -317,7 +317,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
|
||||
@Override
|
||||
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
|
||||
return new FileQueryScanNode(id, desc, false);
|
||||
return new TVFScanNode(id, desc, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user