[feature-wip](multi-catalog) Impl FileScanNode in be (#10402)
Define a new file scanner node for hms table in be.
This file scanner node is different from broker scan node as blow:
1. Broker scan node will define src slot and dest slot, there is two memory copy in it: first is from file to src slot
and second from src to dest slot. Otherwise FileScanNode only have one stemp memory copy just from file to dest slot.
2. Broker scan node will read all the filed in the file to src slot and FileScanNode only read the need filed.
3. Broker scan node will convert type into string type for src slot and then use cast to convert to dest slot type,
but FileScanNode will have the final type.
Now FileScanNode is a standalone code, but we will uniform the file scan and broker scan in the feature.
This commit is contained in:
@ -20,7 +20,7 @@
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.IdGenerator;
|
||||
import org.apache.doris.thrift.TDescriptorTable;
|
||||
|
||||
@ -46,7 +46,7 @@ public class DescriptorTable {
|
||||
private final HashMap<TupleId, TupleDescriptor> tupleDescs = new HashMap<TupleId, TupleDescriptor>();
|
||||
// List of referenced tables with no associated TupleDescriptor to ship to the BE.
|
||||
// For example, the output table of an insert query.
|
||||
private final List<Table> referencedTables = new ArrayList<Table>();
|
||||
private final List<TableIf> referencedTables = new ArrayList<TableIf>();
|
||||
private final IdGenerator<TupleId> tupleIdGenerator = TupleId.createGenerator();
|
||||
private final IdGenerator<SlotId> slotIdGenerator = SlotId.createGenerator();
|
||||
private final HashMap<SlotId, SlotDescriptor> slotDescs = Maps.newHashMap();
|
||||
@ -121,7 +121,7 @@ public class DescriptorTable {
|
||||
return tupleDescs.values();
|
||||
}
|
||||
|
||||
public void addReferencedTable(Table table) {
|
||||
public void addReferencedTable(TableIf table) {
|
||||
referencedTables.add(table);
|
||||
}
|
||||
|
||||
@ -151,7 +151,7 @@ public class DescriptorTable {
|
||||
|
||||
public TDescriptorTable toThrift() {
|
||||
TDescriptorTable result = new TDescriptorTable();
|
||||
HashSet<Table> referencedTbls = Sets.newHashSet();
|
||||
HashSet<TableIf> referencedTbls = Sets.newHashSet();
|
||||
for (TupleDescriptor tupleD : tupleDescs.values()) {
|
||||
// inline view of a non-constant select has a non-materialized tuple descriptor
|
||||
// in the descriptor table just for type checking, which we need to skip
|
||||
@ -161,7 +161,7 @@ public class DescriptorTable {
|
||||
// but its table has no id
|
||||
if (tupleD.getTable() != null
|
||||
&& tupleD.getTable().getId() >= 0) {
|
||||
referencedTbls.add((Table) tupleD.getTable());
|
||||
referencedTbls.add(tupleD.getTable());
|
||||
}
|
||||
for (SlotDescriptor slotD : tupleD.getMaterializedSlots()) {
|
||||
result.addToSlotDescriptors(slotD.toThrift());
|
||||
@ -169,11 +169,9 @@ public class DescriptorTable {
|
||||
}
|
||||
}
|
||||
|
||||
for (Table table : referencedTables) {
|
||||
referencedTbls.add(table);
|
||||
}
|
||||
referencedTbls.addAll(referencedTables);
|
||||
|
||||
for (Table tbl : referencedTbls) {
|
||||
for (TableIf tbl : referencedTbls) {
|
||||
result.addToTableDescriptors(tbl.toThrift());
|
||||
}
|
||||
return result;
|
||||
|
||||
@ -22,7 +22,7 @@ package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.TableAliasGenerator;
|
||||
@ -1186,7 +1186,7 @@ public class StmtRewriter {
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Table table = tableRef.getTable();
|
||||
TableIf table = tableRef.getTable();
|
||||
String dbName = tableRef.getName().getDb();
|
||||
if (dbName == null) {
|
||||
dbName = analyzer.getDefaultDb();
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
@ -300,8 +300,8 @@ public class TableRef implements ParseNode, Writable {
|
||||
return !correlatedTupleIds.isEmpty();
|
||||
}
|
||||
|
||||
public Table getTable() {
|
||||
return (Table) desc.getTable();
|
||||
public TableIf getTable() {
|
||||
return desc.getTable();
|
||||
}
|
||||
|
||||
public void setUsingClause(List<String> colNames) {
|
||||
|
||||
@ -22,7 +22,6 @@ package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.ColumnStats;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.thrift.TTupleDescriptor;
|
||||
|
||||
@ -353,7 +352,7 @@ public class TupleDescriptor {
|
||||
if (slotDescriptor.getColumn() != null) {
|
||||
TupleDescriptor parent = slotDescriptor.getParent();
|
||||
Preconditions.checkState(parent != null);
|
||||
Table table = (Table) parent.getTable();
|
||||
TableIf table = parent.getTable();
|
||||
Preconditions.checkState(table != null);
|
||||
Long tableId = table.getId();
|
||||
Set<String> columnNames = tableIdToColumnNames.get(tableId);
|
||||
|
||||
@ -698,6 +698,10 @@ public class HiveMetaStoreClientHelper {
|
||||
return Type.DATE;
|
||||
case "timestamp":
|
||||
return Type.DATETIME;
|
||||
case "float":
|
||||
return Type.FLOAT;
|
||||
case "double":
|
||||
return Type.DOUBLE;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.catalog;
|
||||
import org.apache.doris.alter.AlterCancelException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -92,11 +93,14 @@ public interface TableIf {
|
||||
|
||||
String getComment(boolean escapeQuota);
|
||||
|
||||
public TTableDescriptor toThrift();
|
||||
|
||||
/**
|
||||
* Doris table type.
|
||||
*/
|
||||
public enum TableType {
|
||||
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, TABLE_VALUED_FUNCTION;
|
||||
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI,
|
||||
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE;
|
||||
|
||||
public String toEngineName() {
|
||||
switch (this) {
|
||||
@ -122,6 +126,8 @@ public interface TableIf {
|
||||
return "Hudi";
|
||||
case TABLE_VALUED_FUNCTION:
|
||||
return "Table_Valued_Function";
|
||||
case HMS_EXTERNAL_TABLE:
|
||||
return "hms";
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
@ -143,6 +149,7 @@ public interface TableIf {
|
||||
case HIVE:
|
||||
case HUDI:
|
||||
case TABLE_VALUED_FUNCTION:
|
||||
case HMS_EXTERNAL_TABLE:
|
||||
return "EXTERNAL TABLE";
|
||||
default:
|
||||
return null;
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -249,5 +250,10 @@ public class ExternalTable implements TableIf {
|
||||
@Override
|
||||
public String getComment(boolean escapeQuota) {
|
||||
return "";
|
||||
|
||||
}
|
||||
|
||||
public TTableDescriptor toThrift() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,11 +21,16 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.thrift.THiveTable;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -38,6 +43,13 @@ public class HMSExternalTable extends ExternalTable {
|
||||
private final String metastoreUri;
|
||||
private final String dbName;
|
||||
private org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
|
||||
private DLAType dlaType = null;
|
||||
|
||||
public enum DLAType {
|
||||
HIVE,
|
||||
HUDI,
|
||||
ICEBERG
|
||||
}
|
||||
|
||||
/**
|
||||
* Create hive metastore external table.
|
||||
@ -51,6 +63,7 @@ public class HMSExternalTable extends ExternalTable {
|
||||
super(id, name);
|
||||
this.dbName = dbName;
|
||||
this.metastoreUri = uri;
|
||||
this.type = TableType.HMS_EXTERNAL_TABLE;
|
||||
init();
|
||||
}
|
||||
|
||||
@ -58,11 +71,11 @@ public class HMSExternalTable extends ExternalTable {
|
||||
getRemoteTable();
|
||||
if (remoteTable.getParameters().containsKey("table_type")
|
||||
&& remoteTable.getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) {
|
||||
type = TableType.ICEBERG;
|
||||
dlaType = DLAType.ICEBERG;
|
||||
} else if (remoteTable.getSd().getInputFormat().toLowerCase().contains("hoodie")) {
|
||||
type = TableType.HUDI;
|
||||
dlaType = DLAType.HUDI;
|
||||
} else {
|
||||
type = TableType.HIVE;
|
||||
dlaType = DLAType.HIVE;
|
||||
}
|
||||
}
|
||||
|
||||
@ -92,6 +105,7 @@ public class HMSExternalTable extends ExternalTable {
|
||||
if (fullSchema == null) {
|
||||
synchronized (this) {
|
||||
if (fullSchema == null) {
|
||||
fullSchema = new ArrayList<>();
|
||||
try {
|
||||
for (FieldSchema field : HiveMetaStoreClientHelper.getSchema(dbName, name, metastoreUri)) {
|
||||
fullSchema.add(new Column(field.getName(),
|
||||
@ -191,4 +205,24 @@ public class HMSExternalTable extends ExternalTable {
|
||||
public String getDbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
/**
|
||||
* get the dla type for scan node to get right information.
|
||||
*/
|
||||
public DLAType getDlaType() {
|
||||
return dlaType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTableDescriptor toThrift() {
|
||||
THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
|
||||
fullSchema.size(), 0, getName(), "");
|
||||
tTableDescriptor.setHiveTable(tHiveTable);
|
||||
return tTableDescriptor;
|
||||
}
|
||||
|
||||
public String getMetastoreUri() {
|
||||
return metastoreUri;
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,7 +49,6 @@ import org.apache.doris.thrift.TBrokerOperationStatus;
|
||||
import org.apache.doris.thrift.TBrokerOperationStatusCode;
|
||||
import org.apache.doris.thrift.TBrokerPReadRequest;
|
||||
import org.apache.doris.thrift.TBrokerPWriteRequest;
|
||||
import org.apache.doris.thrift.TBrokerRangeDesc;
|
||||
import org.apache.doris.thrift.TBrokerReadResponse;
|
||||
import org.apache.doris.thrift.TBrokerRenamePathRequest;
|
||||
import org.apache.doris.thrift.TBrokerVersion;
|
||||
@ -91,25 +90,26 @@ public class BrokerUtil {
|
||||
public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
|
||||
public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
|
||||
|
||||
public static void generateHdfsParam(Map<String, String> properties, TBrokerRangeDesc rangeDesc) {
|
||||
rangeDesc.setHdfsParams(new THdfsParams());
|
||||
rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>());
|
||||
public static THdfsParams generateHdfsParam(Map<String, String> properties) {
|
||||
THdfsParams tHdfsParams = new THdfsParams();
|
||||
tHdfsParams.setHdfsConf(new ArrayList<>());
|
||||
for (Map.Entry<String, String> property : properties.entrySet()) {
|
||||
if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
|
||||
rangeDesc.hdfs_params.setFsName(property.getValue());
|
||||
tHdfsParams.setFsName(property.getValue());
|
||||
} else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) {
|
||||
rangeDesc.hdfs_params.setUser(property.getValue());
|
||||
tHdfsParams.setUser(property.getValue());
|
||||
} else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) {
|
||||
rangeDesc.hdfs_params.setHdfsKerberosPrincipal(property.getValue());
|
||||
tHdfsParams.setHdfsKerberosPrincipal(property.getValue());
|
||||
} else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) {
|
||||
rangeDesc.hdfs_params.setHdfsKerberosKeytab(property.getValue());
|
||||
tHdfsParams.setHdfsKerberosKeytab(property.getValue());
|
||||
} else {
|
||||
THdfsConf hdfsConf = new THdfsConf();
|
||||
hdfsConf.setKey(property.getKey());
|
||||
hdfsConf.setValue(property.getValue());
|
||||
rangeDesc.hdfs_params.hdfs_conf.add(hdfsConf);
|
||||
tHdfsParams.hdfs_conf.add(hdfsConf);
|
||||
}
|
||||
}
|
||||
return tHdfsParams;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -117,6 +117,10 @@ public class DataSourceMgr implements Writable {
|
||||
return dbNames;
|
||||
}
|
||||
|
||||
public DataSourceIf getExternalDatasource(String name) {
|
||||
return nameToCatalogs.get(name);
|
||||
}
|
||||
|
||||
private void writeLock() {
|
||||
lock.writeLock().lock();
|
||||
}
|
||||
|
||||
@ -48,8 +48,9 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
|
||||
//Cache of db name to db id.
|
||||
private ConcurrentHashMap<String, Long> dbNameToId = new ConcurrentHashMap();
|
||||
private AtomicLong nextId = new AtomicLong(0);
|
||||
private static final AtomicLong nextId = new AtomicLong(0);
|
||||
|
||||
private boolean initialized = false;
|
||||
protected String hiveMetastoreUris;
|
||||
protected HiveMetaStoreClient client;
|
||||
|
||||
@ -57,34 +58,21 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
* Default constructor for HMSExternalDataSource.
|
||||
*/
|
||||
public HMSExternalDataSource(String name, Map<String, String> props) {
|
||||
setName(name);
|
||||
getDsProperty().setProperties(props);
|
||||
setType("hms");
|
||||
}
|
||||
|
||||
/**
|
||||
* Hive metastore data source implementation.
|
||||
*
|
||||
* @param hiveMetastoreUris e.g. thrift://127.0.0.1:9083
|
||||
*/
|
||||
public HMSExternalDataSource(long id, String name, String type, DataSourceProperty dsProperty,
|
||||
String hiveMetastoreUris) throws DdlException {
|
||||
this.id = id;
|
||||
this.id = nextId.incrementAndGet();
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.dsProperty = dsProperty;
|
||||
this.hiveMetastoreUris = hiveMetastoreUris;
|
||||
init();
|
||||
this.type = "hms";
|
||||
this.dsProperty = new DataSourceProperty();
|
||||
this.dsProperty.setProperties(props);
|
||||
this.hiveMetastoreUris = props.getOrDefault("hive.metastore.uris", "thrift://127.0.0.1:9083");
|
||||
}
|
||||
|
||||
private void init() throws DdlException {
|
||||
private void init() {
|
||||
HiveConf hiveConf = new HiveConf();
|
||||
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreUris);
|
||||
try {
|
||||
client = new HiveMetaStoreClient(hiveConf);
|
||||
} catch (MetaException e) {
|
||||
LOG.warn("Failed to create HiveMetaStoreClient: {}", e.getMessage());
|
||||
throw new DdlException("Create HMSExternalDataSource failed.", e);
|
||||
}
|
||||
List<String> allDatabases;
|
||||
try {
|
||||
@ -102,8 +90,20 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Datasource can't be init when creating because the external datasource may depend on third system.
|
||||
* So you have to make sure the client of third system is initialized before any method was called.
|
||||
*/
|
||||
private synchronized void makeSureInitialized() {
|
||||
if (!initialized) {
|
||||
init();
|
||||
initialized = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listDatabaseNames(SessionContext ctx) {
|
||||
makeSureInitialized();
|
||||
try {
|
||||
List<String> allDatabases = client.getAllDatabases();
|
||||
// Update the db name to id map.
|
||||
@ -119,6 +119,7 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
|
||||
@Override
|
||||
public List<String> listTableNames(SessionContext ctx, String dbName) {
|
||||
makeSureInitialized();
|
||||
try {
|
||||
return client.getAllTables(dbName);
|
||||
} catch (MetaException e) {
|
||||
@ -129,6 +130,7 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
|
||||
@Override
|
||||
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
|
||||
makeSureInitialized();
|
||||
try {
|
||||
return client.tableExists(dbName, tblName);
|
||||
} catch (TException e) {
|
||||
@ -140,6 +142,7 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
@Nullable
|
||||
@Override
|
||||
public ExternalDatabase getDbNullable(String dbName) {
|
||||
makeSureInitialized();
|
||||
try {
|
||||
client.getDatabase(dbName);
|
||||
} catch (TException e) {
|
||||
@ -156,6 +159,7 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
@Nullable
|
||||
@Override
|
||||
public ExternalDatabase getDbNullable(long dbId) {
|
||||
makeSureInitialized();
|
||||
for (Map.Entry<String, Long> entry : dbNameToId.entrySet()) {
|
||||
if (entry.getValue() == dbId) {
|
||||
return new HMSExternalDatabase(this, dbId, entry.getKey(), hiveMetastoreUris);
|
||||
@ -230,6 +234,7 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
|
||||
@Override
|
||||
public List<Long> getDbIds() {
|
||||
makeSureInitialized();
|
||||
return Lists.newArrayList(dbNameToId.values());
|
||||
}
|
||||
}
|
||||
|
||||
@ -605,7 +605,8 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
rangeDesc.setHeaderType(headerType);
|
||||
// set hdfs params for hdfs file type.
|
||||
if (brokerDesc.getFileType() == TFileType.FILE_HDFS) {
|
||||
BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc);
|
||||
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(brokerDesc.getProperties());
|
||||
rangeDesc.setHdfsParams(tHdfsParams);
|
||||
}
|
||||
return rangeDesc;
|
||||
}
|
||||
|
||||
@ -292,11 +292,9 @@ public class HudiScanNode extends BrokerScanNode {
|
||||
return;
|
||||
}
|
||||
|
||||
THdfsParams hdfsParams = new THdfsParams();
|
||||
String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
|
||||
String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
|
||||
String fsName = fullPath.replace(filePath, "");
|
||||
hdfsParams.setFsName(fsName);
|
||||
Log.debug("Hudi path's host is " + fsName);
|
||||
|
||||
TFileFormatType fileFormatType = null;
|
||||
@ -319,7 +317,7 @@ public class HudiScanNode extends BrokerScanNode {
|
||||
|
||||
TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, fileFormatType,
|
||||
partitionValuesFromPath, numberOfColumnsFromFile, brokerDesc);
|
||||
rangeDesc.setHdfsParams(hdfsParams);
|
||||
rangeDesc.getHdfsParams().setFsName(fsName);
|
||||
rangeDesc.setReadByColumnDef(true);
|
||||
|
||||
curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
|
||||
@ -350,7 +348,8 @@ public class HudiScanNode extends BrokerScanNode {
|
||||
// set hdfs params for hdfs file type.
|
||||
switch (brokerDesc.getFileType()) {
|
||||
case FILE_HDFS:
|
||||
BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc);
|
||||
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(brokerDesc.getProperties());
|
||||
rangeDesc.setHdfsParams(tHdfsParams);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
||||
@ -63,6 +63,7 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.Reference;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.external.ExternalFileScanNode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
@ -1726,6 +1727,9 @@ public class SingleNodePlanner {
|
||||
scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
|
||||
"TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction());
|
||||
break;
|
||||
case HMS_EXTERNAL_TABLE:
|
||||
scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HMS_FILE_SCAN_NODE");
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
@ -19,17 +19,11 @@ package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.NullLiteral;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.StringLiteral;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
@ -42,12 +36,15 @@ import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.BeSelectionPolicy;
|
||||
import org.apache.doris.thrift.TBrokerRangeDesc;
|
||||
import org.apache.doris.thrift.TBrokerScanNode;
|
||||
import org.apache.doris.thrift.TBrokerScanRange;
|
||||
import org.apache.doris.thrift.TBrokerScanRangeParams;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TExternalScanRange;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TFileScanNode;
|
||||
import org.apache.doris.thrift.TFileScanRange;
|
||||
import org.apache.doris.thrift.TFileScanRangeParams;
|
||||
import org.apache.doris.thrift.TFileScanSlotInfo;
|
||||
import org.apache.doris.thrift.TFileTextScanRangeParams;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.THdfsParams;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
@ -61,7 +58,6 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.mapred.FileSplit;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -69,7 +65,6 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -88,9 +83,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n";
|
||||
|
||||
private static class ParamCreateContext {
|
||||
public TBrokerScanRangeParams params;
|
||||
public TFileScanRangeParams params;
|
||||
public TupleDescriptor srcTupleDescriptor;
|
||||
public Map<String, SlotDescriptor> slotDescByName;
|
||||
}
|
||||
|
||||
private static class BackendPolicy {
|
||||
@ -135,16 +129,12 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
private enum DLAType {
|
||||
HIVE,
|
||||
HUDI,
|
||||
ICE_BERG
|
||||
}
|
||||
|
||||
private final BackendPolicy backendPolicy = new BackendPolicy();
|
||||
|
||||
private final ParamCreateContext context = new ParamCreateContext();
|
||||
|
||||
private final List<String> partitionKeys = new ArrayList<>();
|
||||
|
||||
private List<TScanRangeLocations> scanRangeLocations;
|
||||
|
||||
private final HMSExternalTable hmsTable;
|
||||
@ -157,17 +147,17 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
public ExternalFileScanNode(
|
||||
PlanNodeId id,
|
||||
TupleDescriptor desc,
|
||||
String planNodeName) throws MetaNotFoundException {
|
||||
super(id, desc, planNodeName, StatisticalType.BROKER_SCAN_NODE);
|
||||
String planNodeName) {
|
||||
|
||||
this.hmsTable = (HMSExternalTable) desc.getTable();
|
||||
super(id, desc, planNodeName, StatisticalType.FILE_SCAN_NODE);
|
||||
|
||||
DLAType type = getDLAType();
|
||||
switch (type) {
|
||||
this.hmsTable = (HMSExternalTable) this.desc.getTable();
|
||||
|
||||
switch (this.hmsTable.getDlaType()) {
|
||||
case HUDI:
|
||||
this.scanProvider = new ExternalHudiScanProvider(this.hmsTable);
|
||||
break;
|
||||
case ICE_BERG:
|
||||
case ICEBERG:
|
||||
this.scanProvider = new ExternalIcebergScanProvider(this.hmsTable);
|
||||
break;
|
||||
case HIVE:
|
||||
@ -178,61 +168,63 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
private DLAType getDLAType() throws MetaNotFoundException {
|
||||
if (hmsTable.getRemoteTable().getParameters().containsKey("table_type")
|
||||
&& hmsTable.getRemoteTable().getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) {
|
||||
return DLAType.ICE_BERG;
|
||||
} else if (hmsTable.getRemoteTable().getSd().getInputFormat().toLowerCase().contains("hoodie")) {
|
||||
return DLAType.HUDI;
|
||||
} else {
|
||||
return DLAType.HIVE;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
backendPolicy.init();
|
||||
initContext(context);
|
||||
initContext();
|
||||
}
|
||||
|
||||
private void initContext(ParamCreateContext context) throws DdlException, MetaNotFoundException {
|
||||
private void initContext() throws DdlException, MetaNotFoundException {
|
||||
context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor();
|
||||
context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
|
||||
context.params = new TBrokerScanRangeParams();
|
||||
context.params = new TFileScanRangeParams();
|
||||
if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) {
|
||||
Map<String, String> serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
|
||||
String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim"))
|
||||
? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim");
|
||||
String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim"))
|
||||
? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim");
|
||||
context.params.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]);
|
||||
context.params.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]);
|
||||
context.params.setColumnSeparatorStr(columnSeparator);
|
||||
context.params.setLineDelimiterStr(lineDelimiter);
|
||||
context.params.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length);
|
||||
context.params.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length);
|
||||
|
||||
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
|
||||
textParams.setLineDelimiterStr(lineDelimiter);
|
||||
textParams.setColumnSeparatorStr(columnSeparator);
|
||||
|
||||
context.params.setTextParams(textParams);
|
||||
}
|
||||
|
||||
Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
|
||||
context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt());
|
||||
// Need re compute memory layout after set some slot descriptor to nullable
|
||||
context.srcTupleDescriptor.computeStatAndMemLayout();
|
||||
|
||||
Map<String, SlotDescriptor> slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
|
||||
|
||||
List<Column> columns = hmsTable.getBaseSchema(false);
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor);
|
||||
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
|
||||
slotDesc.setType(column.getType());
|
||||
slotDesc.setIsMaterialized(true);
|
||||
slotDesc.setIsNullable(true);
|
||||
slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
|
||||
context.params.addToSrcSlotIds(slotDesc.getId().asInt());
|
||||
slotDesc.setColumn(new Column(column));
|
||||
slotDescByName.put(column.getName(), slotDesc);
|
||||
}
|
||||
context.slotDescByName = slotDescByName;
|
||||
|
||||
// Hive table must extract partition value from path and hudi/iceberg table keep partition field in file.
|
||||
partitionKeys.addAll(scanProvider.getPathPartitionKeys());
|
||||
context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt();
|
||||
|
||||
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
|
||||
slotInfo.setSlotId(slotId);
|
||||
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
|
||||
|
||||
context.params.addToRequiredSlots(slotInfo);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalize(Analyzer analyzer) throws UserException {
|
||||
try {
|
||||
finalizeParams(context.slotDescByName, context.params, context.srcTupleDescriptor);
|
||||
buildScanRange();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Finalize failed.", e);
|
||||
@ -248,50 +240,40 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
return;
|
||||
}
|
||||
|
||||
THdfsParams hdfsParams = new THdfsParams();
|
||||
String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
|
||||
String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
|
||||
String fsName = fullPath.replace(filePath, "");
|
||||
hdfsParams.setFsName(fsName);
|
||||
List<String> partitionKeys = new ArrayList<>();
|
||||
for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) {
|
||||
partitionKeys.add(fieldSchema.getName());
|
||||
}
|
||||
|
||||
// Todo: now every split will assign one scan range, we can merge them for optimize.
|
||||
for (InputSplit split : inputSplits) {
|
||||
FileSplit fileSplit = (FileSplit) split;
|
||||
|
||||
TScanRangeLocations curLocations = newLocations(context.params);
|
||||
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
|
||||
partitionKeys);
|
||||
int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size();
|
||||
|
||||
TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, partitionValuesFromPath,
|
||||
numberOfColumnsFromFile);
|
||||
rangeDesc.setHdfsParams(hdfsParams);
|
||||
rangeDesc.setReadByColumnDef(true);
|
||||
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath);
|
||||
rangeDesc.getHdfsParams().setFsName(fsName);
|
||||
|
||||
curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
|
||||
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
|
||||
Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId()
|
||||
+ " with table split: " + fileSplit.getPath()
|
||||
+ " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")");
|
||||
|
||||
// Put the last file
|
||||
if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
|
||||
scanRangeLocations.add(curLocations);
|
||||
}
|
||||
scanRangeLocations.add(curLocations);
|
||||
}
|
||||
}
|
||||
|
||||
private TScanRangeLocations newLocations(TBrokerScanRangeParams params) {
|
||||
// Generate on broker scan range
|
||||
TBrokerScanRange brokerScanRange = new TBrokerScanRange();
|
||||
brokerScanRange.setParams(params);
|
||||
brokerScanRange.setBrokerAddresses(new ArrayList<>());
|
||||
private TScanRangeLocations newLocations(TFileScanRangeParams params) {
|
||||
// Generate on file scan range
|
||||
TFileScanRange fileScanRange = new TFileScanRange();
|
||||
fileScanRange.setParams(params);
|
||||
|
||||
// Scan range
|
||||
TExternalScanRange externalScanRange = new TExternalScanRange();
|
||||
externalScanRange.setFileScanRange(fileScanRange);
|
||||
TScanRange scanRange = new TScanRange();
|
||||
scanRange.setBrokerScanRange(brokerScanRange);
|
||||
scanRange.setExtScanRange(externalScanRange);
|
||||
|
||||
// Locations
|
||||
TScanRangeLocations locations = new TScanRangeLocations();
|
||||
@ -306,66 +288,24 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
return locations;
|
||||
}
|
||||
|
||||
private TBrokerRangeDesc createBrokerRangeDesc(
|
||||
private TFileRangeDesc createFileRangeDesc(
|
||||
FileSplit fileSplit,
|
||||
List<String> columnsFromPath,
|
||||
int numberOfColumnsFromFile) throws DdlException, MetaNotFoundException {
|
||||
TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
|
||||
List<String> columnsFromPath) throws DdlException, MetaNotFoundException {
|
||||
TFileRangeDesc rangeDesc = new TFileRangeDesc();
|
||||
rangeDesc.setFileType(scanProvider.getTableFileType());
|
||||
rangeDesc.setFormatType(scanProvider.getTableFormatType());
|
||||
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
|
||||
rangeDesc.setSplittable(true);
|
||||
rangeDesc.setStartOffset(fileSplit.getStart());
|
||||
rangeDesc.setSize(fileSplit.getLength());
|
||||
rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
|
||||
rangeDesc.setColumnsFromPath(columnsFromPath);
|
||||
// set hdfs params for hdfs file type.
|
||||
if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
|
||||
BrokerUtil.generateHdfsParam(scanProvider.getTableProperties(), rangeDesc);
|
||||
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties());
|
||||
rangeDesc.setHdfsParams(tHdfsParams);
|
||||
}
|
||||
return rangeDesc;
|
||||
}
|
||||
|
||||
private void finalizeParams(
|
||||
Map<String, SlotDescriptor> slotDescByName,
|
||||
TBrokerScanRangeParams params,
|
||||
TupleDescriptor srcTupleDesc) throws UserException {
|
||||
Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
|
||||
for (SlotDescriptor destSlotDesc : desc.getSlots()) {
|
||||
Expr expr;
|
||||
SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName());
|
||||
if (srcSlotDesc != null) {
|
||||
destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
|
||||
// If dest is allow null, we set source to nullable
|
||||
if (destSlotDesc.getColumn().isAllowNull()) {
|
||||
srcSlotDesc.setIsNullable(true);
|
||||
}
|
||||
expr = new SlotRef(srcSlotDesc);
|
||||
} else {
|
||||
Column column = destSlotDesc.getColumn();
|
||||
if (column.getDefaultValue() != null) {
|
||||
expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue());
|
||||
} else {
|
||||
if (column.isAllowNull()) {
|
||||
expr = NullLiteral.create(column.getType());
|
||||
} else {
|
||||
throw new AnalysisException("column has no source field, column=" + column.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expr = castToSlot(destSlotDesc, expr);
|
||||
params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
|
||||
}
|
||||
params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
|
||||
params.setDestTupleId(desc.getId().asInt());
|
||||
params.setStrictMode(false);
|
||||
params.setSrcTupleId(srcTupleDesc.getId().asInt());
|
||||
|
||||
// Need re compute memory layout after set some slot descriptor to nullable
|
||||
srcTupleDesc.computeStatAndMemLayout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumInstances() {
|
||||
return scanRangeLocations.size();
|
||||
@ -373,18 +313,19 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
|
||||
@Override
|
||||
protected void toThrift(TPlanNode planNode) {
|
||||
planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE);
|
||||
TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
|
||||
planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE);
|
||||
TFileScanNode fileScanNode = new TFileScanNode();
|
||||
fileScanNode.setTupleId(desc.getId().asInt());
|
||||
if (!preFilterConjuncts.isEmpty()) {
|
||||
if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
|
||||
brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
|
||||
fileScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
|
||||
} else {
|
||||
for (Expr e : preFilterConjuncts) {
|
||||
brokerScanNode.addToPreFilterExprs(e.treeToThrift());
|
||||
fileScanNode.addToPreFilterExprs(e.treeToThrift());
|
||||
}
|
||||
}
|
||||
}
|
||||
planNode.setBrokerScanNode(brokerScanNode);
|
||||
planNode.setFileScanNode(fileScanNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -394,15 +335,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
String url;
|
||||
try {
|
||||
url = scanProvider.getMetaStoreUrl();
|
||||
} catch (MetaNotFoundException e) {
|
||||
LOG.warn("Can't get url error", e);
|
||||
url = "Can't get url error.";
|
||||
}
|
||||
return prefix + "DATABASE: " + hmsTable.getDbName() + "\n"
|
||||
+ prefix + "TABLE: " + hmsTable.getName() + "\n"
|
||||
+ prefix + "HIVE URL: " + url + "\n";
|
||||
+ prefix + "HIVE URL: " + scanProvider.getMetaStoreUrl() + "\n";
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,11 +39,13 @@ public interface ExternalFileScanProvider {
|
||||
|
||||
TFileType getTableFileType();
|
||||
|
||||
String getMetaStoreUrl() throws MetaNotFoundException;
|
||||
String getMetaStoreUrl();
|
||||
|
||||
InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException;
|
||||
|
||||
Table getRemoteHiveTable() throws DdlException, MetaNotFoundException;
|
||||
|
||||
Map<String, String> getTableProperties() throws MetaNotFoundException;
|
||||
|
||||
List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException;
|
||||
}
|
||||
|
||||
@ -28,7 +28,6 @@ import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
@ -77,8 +76,8 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetaStoreUrl() throws MetaNotFoundException {
|
||||
return getTableProperties().get(HiveConf.ConfVars.METASTOREURIS.name());
|
||||
public String getMetaStoreUrl() {
|
||||
return hmsTable.getMetastoreUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -146,4 +145,9 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider {
|
||||
public Map<String, String> getTableProperties() throws MetaNotFoundException {
|
||||
return hmsTable.getRemoteTable().getParameters();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
|
||||
return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,8 +19,12 @@ package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A file scan provider for hudi.
|
||||
* HudiProvier is extended with hive since they both use input format interface to get the spilt.
|
||||
@ -35,4 +39,9 @@ public class ExternalHudiScanProvider extends ExternalHiveScanProvider {
|
||||
public TFileFormatType getTableFormatType() throws DdlException {
|
||||
return TFileFormatType.FORMAT_PARQUET;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,16 +18,15 @@
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.catalog.IcebergProperty;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.external.iceberg.HiveCatalog;
|
||||
import org.apache.doris.external.iceberg.util.IcebergUtils;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.FileSplit;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
@ -39,7 +38,10 @@ import org.apache.iceberg.expressions.Expression;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A file scan provider for iceberg.
|
||||
@ -98,8 +100,20 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
|
||||
}
|
||||
|
||||
private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
|
||||
HiveCatalog hiveCatalog = new HiveCatalog();
|
||||
hiveCatalog.initialize(new IcebergProperty(getTableProperties()));
|
||||
org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
|
||||
Configuration conf = new Configuration();
|
||||
hiveCatalog.setConf(conf);
|
||||
// initialize hive catalog
|
||||
Map<String, String> catalogProperties = new HashMap<>();
|
||||
catalogProperties.put("hive.metastore.uris", getMetaStoreUrl());
|
||||
catalogProperties.put("uri", getMetaStoreUrl());
|
||||
hiveCatalog.initialize("hive", catalogProperties);
|
||||
|
||||
return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,4 +45,5 @@ public enum StatisticalType {
|
||||
TABLE_FUNCTION_NODE,
|
||||
UNION_NODE,
|
||||
TABLE_VALUED_FUNCTION_NODE,
|
||||
FILE_SCAN_NODE,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user