diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index 00139913d9..ed7ba00dd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -161,7 +161,7 @@ public class DescriptorTable { // but its table has no id if (tupleD.getTable() != null && tupleD.getTable().getId() >= 0) { - referencedTbls.add(tupleD.getTable()); + referencedTbls.add((Table) tupleD.getTable()); } for (SlotDescriptor slotD : tupleD.getMaterializedSlots()) { result.addToSlotDescriptors(slotD.toThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java index 9cdd0b23b7..1eb41ae6cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java @@ -329,10 +329,7 @@ public class SlotDescriptor { } public boolean isScanSlot() { - Table table = parent.getTable(); - if ((table != null) && (table instanceof OlapTable)) { - return true; - } - return false; + Table table = (Table) parent.getTable(); + return table instanceof OlapTable; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java index 6e8017e3bd..706d82e79c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java @@ -373,7 +373,7 @@ public class SlotRef extends Expr { expr.getTableIdToColumnNames(tableIdToColumnNames); } } else { - Table table = desc.getParent().getTable(); + Table table = (Table) desc.getParent().getTable(); if (table == null) { // Maybe this column comes from inline view. return; @@ -390,8 +390,7 @@ public class SlotRef extends Expr { public Table getTable() { Preconditions.checkState(desc != null); - Table table = desc.getParent().getTable(); - return table; + return (Table) desc.getParent().getTable(); } public void setLabel(String label) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 1d15ea6fe4..107a9a3637 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -301,7 +301,7 @@ public class TableRef implements ParseNode, Writable { } public Table getTable() { - return desc.getTable(); + return (Table) desc.getTable(); } public void setUsingClause(List colNames) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java index 5bfc261713..96aaa615cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java @@ -23,6 +23,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.ColumnStats; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.thrift.TTupleDescriptor; import com.google.common.base.Joiner; @@ -45,7 +46,7 @@ public class TupleDescriptor { private final ArrayList slots; // underlying table, if there is one - private Table table; + private TableIf table; // underlying table, if there is one private TableRef ref; @@ -151,11 +152,11 @@ public class TupleDescriptor { return null; } - public Table getTable() { + public TableIf getTable() { return table; } - public void setTable(Table tbl) { + public void setTable(TableIf tbl) { table = tbl; } @@ -352,7 +353,7 @@ public class TupleDescriptor { if (slotDescriptor.getColumn() != null) { TupleDescriptor parent = slotDescriptor.getParent(); Preconditions.checkState(parent != null); - Table table = parent.getTable(); + Table table = (Table) parent.getTable(); Preconditions.checkState(table != null); Long tableId = table.getId(); Set columnNames = tableIdToColumnNames.get(tableId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java index a575ab110b..eb748ed807 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java @@ -171,7 +171,6 @@ public class IcebergTable extends Table { return fileFormat; } - // get the iceberg table instance, if table is not loaded, load it. private org.apache.iceberg.Table getTable() throws Exception { if (isLoaded.get()) { Preconditions.checkNotNull(icebergTable); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 8c5ea18d06..c4167b89df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -131,4 +131,11 @@ public class HMSExternalTable extends ExternalTable { } return null; } + + /** + * get database name of hms table. + */ + public String getDbName() { + return dbName; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 38a9e88e3c..9192c0981f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -363,13 +363,13 @@ public class HashJoinNode extends PlanNode { } public double lhsNumRows() { - Table table = lhs.getParent().getTable(); + Table table = (Table) lhs.getParent().getTable(); Preconditions.checkState(table instanceof OlapTable); return ((OlapTable) (table)).getRowCount(); } public double rhsNumRows() { - Table table = rhs.getParent().getTable(); + Table table = (Table) rhs.getParent().getTable(); Preconditions.checkState(table instanceof OlapTable); return ((OlapTable) (table)).getRowCount(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java new file mode 100644 index 0000000000..8be039b30f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.mysql.privilege.UserProperty; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; +import org.apache.doris.thrift.TBrokerRangeDesc; +import org.apache.doris.thrift.TBrokerScanNode; +import org.apache.doris.thrift.TBrokerScanRange; +import org.apache.doris.thrift.TBrokerScanRangeParams; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.mortbay.log.Log; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +/** + * ExternalFileScanNode for the file access type of datasource, now only support hive,hudi and iceberg. + */ +public class ExternalFileScanNode extends ExternalScanNode { + private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class); + + private static final String HIVE_DEFAULT_COLUMN_SEPARATOR = "\001"; + + private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n"; + + private static class ParamCreateContext { + public TBrokerScanRangeParams params; + public TupleDescriptor srcTupleDescriptor; + public Map slotDescByName; + } + + private static class BackendPolicy { + private final List backends = Lists.newArrayList(); + + private int nextBe = 0; + + public void init() throws UserException { + Set tags = Sets.newHashSet(); + if (ConnectContext.get().getCurrentUserIdentity() != null) { + String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); + tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(qualifiedUser); + if (tags == UserProperty.INVALID_RESOURCE_TAGS) { + throw new UserException("No valid resource tag for user: " + qualifiedUser); + } + } else { + LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer"); + } + + // scan node is used for query + BeSelectionPolicy policy = new BeSelectionPolicy.Builder() + .needQueryAvailable() + .needLoadAvailable() + .addTags(tags) + .build(); + for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) { + if (policy.isMatch(be)) { + backends.add(be); + } + } + if (backends.isEmpty()) { + throw new UserException("No available backends"); + } + Random random = new Random(System.currentTimeMillis()); + Collections.shuffle(backends, random); + } + + public Backend getNextBe() { + Backend selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); + return selectedBackend; + } + } + + private enum DLAType { + HIVE, + HUDI, + ICE_BERG + } + + private final BackendPolicy backendPolicy = new BackendPolicy(); + + private final ParamCreateContext context = new ParamCreateContext(); + + private List scanRangeLocations; + + private final HMSExternalTable hmsTable; + + private ExternalFileScanProvider scanProvider; + + /** + * External file scan node for hms table. + */ + public ExternalFileScanNode( + PlanNodeId id, + TupleDescriptor desc, + String planNodeName) throws MetaNotFoundException { + super(id, desc, planNodeName, NodeType.BROKER_SCAN_NODE); + + this.hmsTable = (HMSExternalTable) desc.getTable(); + + DLAType type = getDLAType(); + switch (type) { + case HUDI: + this.scanProvider = new ExternalHudiScanProvider(this.hmsTable); + break; + case ICE_BERG: + this.scanProvider = new ExternalIcebergScanProvider(this.hmsTable); + break; + case HIVE: + this.scanProvider = new ExternalHiveScanProvider(this.hmsTable); + break; + default: + LOG.warn("Unknown table for dla."); + } + } + + private DLAType getDLAType() throws MetaNotFoundException { + if (hmsTable.getRemoteTable().getParameters().containsKey("table_type") + && hmsTable.getRemoteTable().getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) { + return DLAType.ICE_BERG; + } else if (hmsTable.getRemoteTable().getSd().getInputFormat().toLowerCase().contains("hoodie")) { + return DLAType.HUDI; + } else { + return DLAType.HIVE; + } + } + + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + backendPolicy.init(); + initContext(context); + } + + private void initContext(ParamCreateContext context) throws DdlException, MetaNotFoundException { + context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); + context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + context.params = new TBrokerScanRangeParams(); + if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) { + Map serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); + String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim")) + ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim"); + String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim")) + ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim"); + context.params.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]); + context.params.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]); + context.params.setColumnSeparatorStr(columnSeparator); + context.params.setLineDelimiterStr(lineDelimiter); + context.params.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length); + context.params.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length); + } + + Map slotDescByName = Maps.newHashMap(); + + List columns = hmsTable.getBaseSchema(false); + for (Column column : columns) { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setIsMaterialized(true); + slotDesc.setIsNullable(true); + slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR)); + context.params.addToSrcSlotIds(slotDesc.getId().asInt()); + slotDescByName.put(column.getName(), slotDesc); + } + context.slotDescByName = slotDescByName; + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + try { + finalizeParams(context.slotDescByName, context.params, context.srcTupleDescriptor); + buildScanRange(); + } catch (IOException e) { + LOG.error("Finalize failed.", e); + throw new UserException("Finalize failed.", e); + } + } + + // If fileFormat is not null, we use fileFormat instead of check file's suffix + private void buildScanRange() throws UserException, IOException { + scanRangeLocations = Lists.newArrayList(); + InputSplit[] inputSplits = scanProvider.getSplits(conjuncts); + if (0 == inputSplits.length) { + return; + } + + THdfsParams hdfsParams = new THdfsParams(); + String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); + String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); + String fsName = fullPath.replace(filePath, ""); + hdfsParams.setFsName(fsName); + List partitionKeys = new ArrayList<>(); + for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) { + partitionKeys.add(fieldSchema.getName()); + } + + for (InputSplit split : inputSplits) { + FileSplit fileSplit = (FileSplit) split; + + TScanRangeLocations curLocations = newLocations(context.params); + List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), + partitionKeys); + int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size(); + + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, partitionValuesFromPath, + numberOfColumnsFromFile); + rangeDesc.setHdfsParams(hdfsParams); + rangeDesc.setReadByColumnDef(true); + + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); + Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + + " with table split: " + fileSplit.getPath() + + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"); + + // Put the last file + if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) { + scanRangeLocations.add(curLocations); + } + } + } + + private TScanRangeLocations newLocations(TBrokerScanRangeParams params) { + // Generate on broker scan range + TBrokerScanRange brokerScanRange = new TBrokerScanRange(); + brokerScanRange.setParams(params); + brokerScanRange.setBrokerAddresses(new ArrayList<>()); + + // Scan range + TScanRange scanRange = new TScanRange(); + scanRange.setBrokerScanRange(brokerScanRange); + + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + + TScanRangeLocation location = new TScanRangeLocation(); + Backend selectedBackend = backendPolicy.getNextBe(); + location.setBackendId(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); + + return locations; + } + + private TBrokerRangeDesc createBrokerRangeDesc( + FileSplit fileSplit, + List columnsFromPath, + int numberOfColumnsFromFile) throws DdlException, MetaNotFoundException { + TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); + rangeDesc.setFileType(scanProvider.getTableFileType()); + rangeDesc.setFormatType(scanProvider.getTableFormatType()); + rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); + rangeDesc.setSplittable(true); + rangeDesc.setStartOffset(fileSplit.getStart()); + rangeDesc.setSize(fileSplit.getLength()); + rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile); + rangeDesc.setColumnsFromPath(columnsFromPath); + // set hdfs params for hdfs file type. + if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { + BrokerUtil.generateHdfsParam(scanProvider.getTableProperties(), rangeDesc); + } + return rangeDesc; + } + + private void finalizeParams( + Map slotDescByName, + TBrokerScanRangeParams params, + TupleDescriptor srcTupleDesc) throws UserException { + Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); + for (SlotDescriptor destSlotDesc : desc.getSlots()) { + Expr expr; + SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); + if (srcSlotDesc != null) { + destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); + // If dest is allow null, we set source to nullable + if (destSlotDesc.getColumn().isAllowNull()) { + srcSlotDesc.setIsNullable(true); + } + expr = new SlotRef(srcSlotDesc); + } else { + Column column = destSlotDesc.getColumn(); + if (column.getDefaultValue() != null) { + expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); + } else { + if (column.isAllowNull()) { + expr = NullLiteral.create(column.getType()); + } else { + throw new AnalysisException("column has no source field, column=" + column.getName()); + } + } + } + + expr = castToSlot(destSlotDesc, expr); + params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift()); + } + params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans); + params.setDestTupleId(desc.getId().asInt()); + params.setStrictMode(false); + params.setSrcTupleId(srcTupleDesc.getId().asInt()); + + // Need re compute memory layout after set some slot descriptor to nullable + srcTupleDesc.computeStatAndMemLayout(); + } + + @Override + public int getNumInstances() { + return scanRangeLocations.size(); + } + + @Override + protected void toThrift(TPlanNode planNode) { + planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE); + TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); + if (!preFilterConjuncts.isEmpty()) { + if (Config.enable_vectorized_load && vpreFilterConjunct != null) { + brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift()); + } else { + for (Expr e : preFilterConjuncts) { + brokerScanNode.addToPreFilterExprs(e.treeToThrift()); + } + } + } + planNode.setBrokerScanNode(brokerScanNode); + } + + @Override + public List getScanRangeLocations(long maxScanRangeLength) { + return scanRangeLocations; + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + String url; + try { + url = scanProvider.getMetaStoreUrl(); + } catch (MetaNotFoundException e) { + LOG.warn("Can't get url error", e); + url = "Can't get url error."; + } + return prefix + "DATABASE: " + hmsTable.getDbName() + "\n" + + prefix + "TABLE: " + hmsTable.getName() + "\n" + + prefix + "HIVE URL: " + url + "\n"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java new file mode 100644 index 0000000000..2fc626ab70 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * An interface for file scan node to get the need information. + */ +public interface ExternalFileScanProvider { + TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException; + + TFileType getTableFileType(); + + String getMetaStoreUrl() throws MetaNotFoundException; + + InputSplit[] getSplits(List exprs) throws IOException, UserException; + + Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; + + Map getTableProperties() throws MetaNotFoundException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java new file mode 100644 index 0000000000..979a5a29d3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A HiveScanProvider to get information for scan node. + */ +public class ExternalHiveScanProvider implements ExternalFileScanProvider { + protected HMSExternalTable hmsTable; + + public ExternalHiveScanProvider(HMSExternalTable hmsTable) { + this.hmsTable = hmsTable; + } + + @Override + public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException { + TFileFormatType type = null; + String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); + String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName); + if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) { + type = TFileFormatType.FORMAT_PARQUET; + } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) { + type = TFileFormatType.FORMAT_ORC; + } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) { + type = TFileFormatType.FORMAT_CSV_PLAIN; + } + return type; + } + + @Override + public TFileType getTableFileType() { + return TFileType.FILE_HDFS; + } + + @Override + public String getMetaStoreUrl() throws MetaNotFoundException { + return getTableProperties().get(HiveConf.ConfVars.METASTOREURIS.name()); + } + + @Override + public InputSplit[] getSplits(List exprs) + throws IOException, UserException { + String splitsPath = getRemoteHiveTable().getSd().getLocation(); + List partitionKeys = getRemoteHiveTable().getPartitionKeys() + .stream().map(FieldSchema::getName).collect(Collectors.toList()); + + if (partitionKeys.size() > 0) { + ExprNodeGenericFuncDesc hivePartitionPredicate = extractHivePartitionPredicate(exprs, partitionKeys); + + String metaStoreUris = getMetaStoreUrl(); + List hivePartitions = HiveMetaStoreClientHelper.getHivePartitions( + metaStoreUris, getRemoteHiveTable(), hivePartitionPredicate); + if (!hivePartitions.isEmpty()) { + splitsPath = hivePartitions.stream().map(x -> x.getSd().getLocation()) + .collect(Collectors.joining(",")); + } + } + + String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); + + Configuration configuration = new Configuration(); + InputFormat inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); + JobConf jobConf = new JobConf(configuration); + FileInputFormat.setInputPaths(jobConf, splitsPath); + return inputFormat.getSplits(jobConf, 0); + } + + + private ExprNodeGenericFuncDesc extractHivePartitionPredicate(List conjuncts, List partitionKeys) + throws DdlException { + ExprNodeGenericFuncDesc hivePartitionPredicate; + List exprNodeDescs = new ArrayList<>(); + for (Expr conjunct : conjuncts) { + ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr( + conjunct, partitionKeys, hmsTable.getName()); + if (hiveExpr != null) { + exprNodeDescs.add(hiveExpr); + } + } + int count = exprNodeDescs.size(); + + if (count >= 2) { + hivePartitionPredicate = HiveMetaStoreClientHelper.getCompoundExpr(exprNodeDescs, "and"); + } else if (count == 1) { + hivePartitionPredicate = (ExprNodeGenericFuncDesc) exprNodeDescs.get(0); + } else { + HiveMetaStoreClientHelper.ExprBuilder exprBuilder = + new HiveMetaStoreClientHelper.ExprBuilder(hmsTable.getName()); + hivePartitionPredicate = exprBuilder.val(TypeInfoFactory.intTypeInfo, 1) + .val(TypeInfoFactory.intTypeInfo, 1) + .pred("=", 2).build(); + } + return hivePartitionPredicate; + } + + @Override + public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { + return hmsTable.getRemoteTable(); + } + + @Override + public Map getTableProperties() throws MetaNotFoundException { + return hmsTable.getRemoteTable().getParameters(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java new file mode 100644 index 0000000000..2951c1fb7a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.thrift.TFileFormatType; + +/** + * A file scan provider for hudi. + * HudiProvier is extended with hive since they both use input format interface to get the spilt. + */ +public class ExternalHudiScanProvider extends ExternalHiveScanProvider { + + public ExternalHudiScanProvider(HMSExternalTable hmsTable) { + super(hmsTable); + } + + @Override + public TFileFormatType getTableFormatType() throws DdlException { + return TFileFormatType.FORMAT_PARQUET; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java new file mode 100644 index 0000000000..28ba3dad0a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.IcebergProperty; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.external.iceberg.HiveCatalog; +import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A file scan provider for iceberg. + */ +public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { + + public ExternalIcebergScanProvider(HMSExternalTable hmsTable) { + super(hmsTable); + } + + @Override + public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException { + TFileFormatType type; + + String icebergFormat = getRemoteHiveTable().getParameters() + .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + if (icebergFormat.equals("parquet")) { + type = TFileFormatType.FORMAT_PARQUET; + } else if (icebergFormat.equals("orc")) { + type = TFileFormatType.FORMAT_ORC; + } else { + throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat)); + } + return type; + } + + @Override + public TFileType getTableFileType() { + return TFileType.FILE_HDFS; + } + + @Override + public InputSplit[] getSplits(List exprs) throws IOException, UserException { + List expressions = new ArrayList<>(); + for (Expr conjunct : exprs) { + Expression expression = IcebergUtils.convertToIcebergExpr(conjunct); + if (expression != null) { + expressions.add(expression); + } + } + + org.apache.iceberg.Table table = getIcebergTable(); + TableScan scan = table.newScan(); + for (Expression predicate : expressions) { + scan = scan.filter(predicate); + } + List splits = new ArrayList<>(); + + for (FileScanTask task : scan.planFiles()) { + for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) { + splits.add(new FileSplit(new Path(spitTask.file().path().toString()), + spitTask.start(), spitTask.length(), new String[0])); + } + } + return splits.toArray(new InputSplit[0]); + } + + private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { + HiveCatalog hiveCatalog = new HiveCatalog(); + hiveCatalog.initialize(new IcebergProperty(getTableProperties())); + return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName())); + } +}