[fix](catalog) close connection on refresh (#35426) (#35743)

bp #35426
This commit is contained in:
Mingyu Chen
2024-05-31 22:51:13 +08:00
committed by GitHub
parent b864aa7aa2
commit 1d89dd7607
10 changed files with 55 additions and 103 deletions

View File

@ -71,7 +71,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.InfoSchemaDb;
import org.apache.doris.catalog.JdbcTable;
@ -136,9 +135,6 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.es.EsRepository;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HiveMetadataOps;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.event.DropPartitionEvent;
import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@ -181,7 +177,6 @@ import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
@ -2886,30 +2881,6 @@ public class InternalCatalog implements CatalogIf<Database> {
return checkCreateTableResult(tableName, tableId, result);
}
private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
long tableId = Env.getCurrentEnv().getNextId();
HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
hiveTable.setComment(stmt.getComment());
// check hive table whether exists in hive database
HiveConf hiveConf = new HiveConf();
hiveConf.set(HMSProperties.HIVE_METASTORE_URIS,
hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
if (!Strings.isNullOrEmpty(hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION))) {
hiveConf.set(HMSProperties.HIVE_VERSION, hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION));
}
HMSCachedClient client = HiveMetadataOps.createCachedClient(hiveConf, 1, null);
if (!client.tableExists(hiveTable.getHiveDb(), hiveTable.getHiveTable())) {
throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
}
// check hive table if exists in doris database
if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
}
private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();

View File

@ -110,4 +110,9 @@ public interface HMSCachedClient {
void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions);
void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData);
/**
* close the connection, eg, to hms
*/
void close();
}

View File

@ -162,6 +162,14 @@ public class HMSExternalCatalog extends ExternalCatalog {
metadataOps = hiveOps;
}
@Override
public void onRefresh(boolean invalidCache) {
super.onRefresh(invalidCache);
if (metadataOps != null) {
metadataOps.close();
}
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();

View File

@ -32,36 +32,25 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.thrift.TExprOpcode;
import com.aliyun.datalake.metastore.common.DataLakeConfig;
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@ -76,7 +65,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import shade.doris.hive.org.apache.thrift.TException;
import java.security.PrivilegedExceptionAction;
import java.time.LocalDateTime;
@ -148,65 +136,6 @@ public class HiveMetaStoreClientHelper {
}
}
private static IMetaStoreClient getClient(String metaStoreUris) throws DdlException {
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUris);
hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
String.valueOf(Config.hive_metastore_client_timeout_second));
IMetaStoreClient metaStoreClient = null;
String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
try {
if ("dlf".equalsIgnoreCase(type)) {
// For aliyun DLF
hiveConf.set(DataLakeConfig.CATALOG_CREATE_DEFAULT_DB, "false");
metaStoreClient = new ProxyMetaStoreClient(hiveConf);
} else {
metaStoreClient = new HiveMetaStoreClient(hiveConf);
}
} catch (MetaException e) {
LOG.warn("Create HiveMetaStoreClient failed: {}", e.getMessage());
throw new DdlException("Create HiveMetaStoreClient failed: " + e.getMessage());
}
return metaStoreClient;
}
public static Table getTable(HiveTable hiveTable) throws DdlException {
IMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
Table table;
try {
table = client.getTable(hiveTable.getHiveDb(), hiveTable.getHiveTable());
} catch (TException e) {
LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage());
}
return table;
}
/**
* Get hive table with dbName and tableName.
* Only for Hudi.
*
* @param dbName database name
* @param tableName table name
* @param metaStoreUris hive metastore uris
* @return HiveTable
* @throws DdlException when get table from hive metastore failed.
*/
@Deprecated
public static Table getTable(String dbName, String tableName, String metaStoreUris) throws DdlException {
IMetaStoreClient client = getClient(metaStoreUris);
Table table;
try {
table = client.getTable(dbName, tableName);
} catch (TException e) {
LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage());
} finally {
client.close();
}
return table;
}
/**
* Convert Doris expr to Hive expr, only for partition column
* @param tblName

View File

@ -81,8 +81,8 @@ public class HiveMetadataOps implements ExternalMetadataOps {
return catalog;
}
public static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize,
JdbcClientConfig jdbcClientConfig) {
private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize,
JdbcClientConfig jdbcClientConfig) {
if (hiveConf != null) {
return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
}
@ -266,6 +266,11 @@ public class HiveMetadataOps implements ExternalMetadataOps {
return listDatabaseNames().contains(dbName);
}
@Override
public void close() {
client.close();
}
public List<String> listDatabaseNames() {
return client.getAllDatabases();
}

View File

@ -63,6 +63,11 @@ public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient {
super(jdbcClientConfig);
}
@Override
public void close() {
// the jdbc connection is used on demand, so we do not need to close it.
}
@Override
public Database getDatabase(String dbName) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");

View File

@ -89,6 +89,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
private static final short MAX_LIST_PARTITION_NUM = Config.max_hive_list_partition_num;
private Queue<ThriftHMSClient> clientPool = new LinkedList<>();
private boolean isClosed = false;
private final int poolSize;
private final HiveConf hiveConf;
@ -100,6 +101,21 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
}
this.hiveConf = hiveConf;
this.poolSize = poolSize;
this.isClosed = false;
}
@Override
public void close() {
synchronized (clientPool) {
this.isClosed = true;
while (!clientPool.isEmpty()) {
try {
clientPool.poll().close();
} catch (Exception e) {
LOG.warn("failed to close thrift client", e);
}
}
}
}
@Override
@ -604,7 +620,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
@Override
public void close() throws Exception {
synchronized (clientPool) {
if (throwable != null || clientPool.size() > poolSize) {
if (isClosed || throwable != null || clientPool.size() > poolSize) {
client.close();
} else {
clientPool.offer(this);

View File

@ -64,6 +64,10 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
return catalog;
}
@Override
public void close() {
}
@Override
public boolean tableExist(String dbName, String tblName) {
return catalog.tableExists(TableIdentifier.of(dbName, tblName));

View File

@ -82,4 +82,9 @@ public interface ExternalMetadataOps {
boolean tableExist(String dbName, String tblName);
boolean databaseExist(String dbName);
/**
* close the connection, eg, to hms
*/
void close();
}