[Improvement](multicatalog) support show_partitions for hms catalog (#19242)
* [Improvement](multicatalog) support show_partitions for hms catalog * update according review advice
This commit is contained in:
@ -32,7 +32,7 @@ SHOW PARTITIONS
|
||||
|
||||
### Description
|
||||
|
||||
This statement is used to display partition information
|
||||
This statement is used to display partition information for tables in Internal catalog or Hive Catalog
|
||||
|
||||
grammar:
|
||||
|
||||
@ -42,9 +42,17 @@ grammar:
|
||||
|
||||
illustrate:
|
||||
|
||||
When used in Internal catalog:
|
||||
1. Support the filtering of PartitionId, PartitionName, State, Buckets, ReplicationNum, LastConsistencyCheckTime and other columns
|
||||
2. TEMPORARY specifies to list temporary partitions
|
||||
|
||||
<version since="dev">
|
||||
|
||||
when used in Hive Catalog:
|
||||
Will return all partitions' name. Support multilevel partition table
|
||||
|
||||
</version>
|
||||
|
||||
### Example
|
||||
|
||||
1. Display all non-temporary partition information of the specified table under the specified db
|
||||
|
||||
@ -32,7 +32,7 @@ SHOW PARTITIONS
|
||||
|
||||
### Description
|
||||
|
||||
该语句用于展示分区信息
|
||||
该语句用于展示分区信息。支持 Internal catalog 和 Hive Catalog
|
||||
|
||||
语法:
|
||||
|
||||
@ -42,9 +42,17 @@ SHOW PARTITIONS
|
||||
|
||||
说明:
|
||||
|
||||
对于 Internal catalog:
|
||||
1. 支持PartitionId,PartitionName,State,Buckets,ReplicationNum,LastConsistencyCheckTime等列的过滤
|
||||
2. TEMPORARY指定列出临时分区
|
||||
|
||||
<version since="dev">
|
||||
|
||||
对于 Hive Catalog:
|
||||
支持返回所有分区,包括多级分区
|
||||
|
||||
</version>
|
||||
|
||||
### Example
|
||||
|
||||
1. 展示指定db下指定表的所有非临时分区信息
|
||||
|
||||
@ -18,10 +18,11 @@
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
@ -32,7 +33,8 @@ import org.apache.doris.common.proc.ProcNodeInterface;
|
||||
import org.apache.doris.common.proc.ProcResult;
|
||||
import org.apache.doris.common.proc.ProcService;
|
||||
import org.apache.doris.common.util.OrderByPair;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
@ -57,6 +59,7 @@ public class ShowPartitionsStmt extends ShowStmt {
|
||||
private static final String FILTER_REPLICATION_NUM = "ReplicationNum";
|
||||
private static final String FILTER_LAST_CONSISTENCY_CHECK_TIME = "LastConsistencyCheckTime";
|
||||
|
||||
private CatalogIf catalog;
|
||||
private TableName tableName;
|
||||
private Expr whereClause;
|
||||
private List<OrderByElement> orderByElements;
|
||||
@ -80,6 +83,14 @@ public class ShowPartitionsStmt extends ShowStmt {
|
||||
this.isTempPartition = isTempPartition;
|
||||
}
|
||||
|
||||
public CatalogIf getCatalog() {
|
||||
return catalog;
|
||||
}
|
||||
|
||||
public TableName getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
public List<OrderByPair> getOrderByPairs() {
|
||||
return orderByPairs;
|
||||
}
|
||||
@ -102,15 +113,19 @@ public class ShowPartitionsStmt extends ShowStmt {
|
||||
// check access
|
||||
String dbName = tableName.getDb();
|
||||
String tblName = tableName.getTbl();
|
||||
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), dbName, tblName,
|
||||
PrivPredicate.SHOW)) {
|
||||
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), catalog.getName(), dbName,
|
||||
tblName, PrivPredicate.SHOW)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SHOW PARTITIONS",
|
||||
ConnectContext.get().getQualifiedUser(),
|
||||
ConnectContext.get().getRemoteIP(),
|
||||
dbName + ": " + tblName);
|
||||
}
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
|
||||
Table table = db.getTableOrMetaException(tblName, Table.TableType.OLAP);
|
||||
if (!catalog.isInternalCatalog()) {
|
||||
return;
|
||||
}
|
||||
|
||||
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
|
||||
TableIf table = db.getTableOrMetaException(tblName, Table.TableType.OLAP);
|
||||
table.readLock();
|
||||
try {
|
||||
// build proc path
|
||||
@ -137,8 +152,16 @@ public class ShowPartitionsStmt extends ShowStmt {
|
||||
public void analyzeImpl(Analyzer analyzer) throws UserException {
|
||||
super.analyze(analyzer);
|
||||
tableName.analyze(analyzer);
|
||||
// disallow external catalog
|
||||
Util.prohibitExternalCatalog(tableName.getCtl(), this.getClass().getSimpleName());
|
||||
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(tableName.getCtl());
|
||||
if (catalog == null) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_NAME_FOR_CATALOG);
|
||||
}
|
||||
|
||||
// disallow unsupported catalog
|
||||
if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog)) {
|
||||
throw new AnalysisException(String.format("Catalog of type '%s' is not allowed in ShowPartitionsStmt",
|
||||
catalog.getType()));
|
||||
}
|
||||
|
||||
// analyze where clause if not null
|
||||
if (whereClause != null) {
|
||||
@ -222,15 +245,19 @@ public class ShowPartitionsStmt extends ShowStmt {
|
||||
public ShowResultSetMetaData getMetaData() {
|
||||
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
|
||||
|
||||
ProcResult result = null;
|
||||
try {
|
||||
result = node.fetchResult();
|
||||
} catch (AnalysisException e) {
|
||||
return builder.build();
|
||||
}
|
||||
if (catalog.isInternalCatalog()) {
|
||||
ProcResult result = null;
|
||||
try {
|
||||
result = node.fetchResult();
|
||||
} catch (AnalysisException e) {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
for (String col : result.getColumnNames()) {
|
||||
builder.addColumn(new Column(col, ScalarType.createVarchar(30)));
|
||||
for (String col : result.getColumnNames()) {
|
||||
builder.addColumn(new Column(col, ScalarType.createVarchar(30)));
|
||||
}
|
||||
} else {
|
||||
builder.addColumn(new Column("Partition", ScalarType.createVarchar(60)));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.datasource;
|
||||
|
||||
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
@ -50,6 +51,10 @@ public interface CatalogIf<T extends DatabaseIf> {
|
||||
|
||||
List<String> getDbNames();
|
||||
|
||||
default boolean isInternalCatalog() {
|
||||
return this instanceof InternalCatalog;
|
||||
}
|
||||
|
||||
// Will be used when querying the information_schema table
|
||||
// Unable to get db for uninitialized catalog to avoid query timeout
|
||||
default List<String> getDbNamesOrEmpty() {
|
||||
|
||||
@ -171,6 +171,7 @@ import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.external.iceberg.IcebergTableCreationRecord;
|
||||
import org.apache.doris.load.DeleteHandler;
|
||||
import org.apache.doris.load.ExportJob;
|
||||
@ -1525,10 +1526,34 @@ public class ShowExecutor {
|
||||
|
||||
private void handleShowPartitions() throws AnalysisException {
|
||||
ShowPartitionsStmt showStmt = (ShowPartitionsStmt) stmt;
|
||||
ProcNodeInterface procNodeI = showStmt.getNode();
|
||||
Preconditions.checkNotNull(procNodeI);
|
||||
List<List<String>> rows = ((PartitionsProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(),
|
||||
showStmt.getOrderByPairs(), showStmt.getLimitElement()).getRows();
|
||||
if (showStmt.getCatalog().isInternalCatalog()) {
|
||||
ProcNodeInterface procNodeI = showStmt.getNode();
|
||||
Preconditions.checkNotNull(procNodeI);
|
||||
List<List<String>> rows = ((PartitionsProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(),
|
||||
showStmt.getOrderByPairs(), showStmt.getLimitElement()).getRows();
|
||||
resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
|
||||
} else {
|
||||
handleShowHMSTablePartitions(showStmt);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleShowHMSTablePartitions(ShowPartitionsStmt showStmt) {
|
||||
HMSExternalCatalog catalog = (HMSExternalCatalog) (showStmt.getCatalog());
|
||||
List<List<String>> rows = new ArrayList<>();
|
||||
String dbName = ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb());
|
||||
List<String> partitionNames = catalog.getClient().listPartitionNames(dbName,
|
||||
showStmt.getTableName().getTbl());
|
||||
for (String partition : partitionNames) {
|
||||
List<String> list = new ArrayList<>();
|
||||
list.add(partition);
|
||||
rows.add(list);
|
||||
}
|
||||
|
||||
// sort by partition name
|
||||
rows.sort((x, y) -> {
|
||||
return x.get(0).compareTo(y.get(0));
|
||||
});
|
||||
|
||||
resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user