[feature](HiveCatalog) Support for getting hive meta data from relational databases under HMS (#28188)
This commit is contained in:
@ -26,8 +26,8 @@ import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClient;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
@ -382,25 +382,25 @@ public class HMSExternalTable extends ExternalTable {
|
||||
}
|
||||
|
||||
public List<ColumnStatisticsObj> getHiveTableColumnStats(List<String> columns) {
|
||||
PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
|
||||
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
|
||||
return client.getTableColumnStatistics(dbName, name, columns);
|
||||
}
|
||||
|
||||
public Map<String, List<ColumnStatisticsObj>> getHivePartitionColumnStats(
|
||||
List<String> partNames, List<String> columns) {
|
||||
PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
|
||||
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
|
||||
return client.getPartitionColumnStatistics(dbName, name, partNames, columns);
|
||||
}
|
||||
|
||||
public Partition getPartition(List<String> partitionValues) {
|
||||
PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
|
||||
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
|
||||
return client.getPartition(dbName, name, partitionValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getPartitionNames() {
|
||||
makeSureInitialized();
|
||||
PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
|
||||
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
|
||||
List<String> names = client.listPartitionNames(dbName, name);
|
||||
return new HashSet<>(names);
|
||||
}
|
||||
|
||||
@ -24,4 +24,8 @@ public class HMSClientException extends RuntimeException {
|
||||
super(String.format(format, msg) + (cause == null ? "" : ". reason: " + Util.getRootCauseMessage(cause)),
|
||||
cause);
|
||||
}
|
||||
|
||||
public HMSClientException(String format, Object... msg) {
|
||||
super(String.format(format, msg));
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,8 +25,10 @@ import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.catalog.external.HMSExternalDatabase;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClient;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClientFactory;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
|
||||
@ -54,7 +56,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class);
|
||||
|
||||
private static final int MIN_CLIENT_POOL_SIZE = 8;
|
||||
protected PooledHiveMetaStoreClient client;
|
||||
protected HMSCachedClient client;
|
||||
// Record the latest synced event id when processing hive events
|
||||
// Must set to -1 otherwise client.getNextNotification will throw exception
|
||||
// Reference to https://github.com/apDdlache/doris/issues/18251
|
||||
@ -144,33 +146,44 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
|
||||
@Override
|
||||
protected void initLocalObjectsImpl() {
|
||||
HiveConf hiveConf = new HiveConf();
|
||||
for (Map.Entry<String, String> kv : catalogProperty.getHadoopProperties().entrySet()) {
|
||||
hiveConf.set(kv.getKey(), kv.getValue());
|
||||
}
|
||||
hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
|
||||
String.valueOf(Config.hive_metastore_client_timeout_second));
|
||||
String authentication = catalogProperty.getOrDefault(
|
||||
HdfsResource.HADOOP_SECURITY_AUTHENTICATION, "");
|
||||
if (AuthType.KERBEROS.getDesc().equals(authentication)) {
|
||||
hiveConf.set(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authentication);
|
||||
UserGroupInformation.setConfiguration(hiveConf);
|
||||
try {
|
||||
/**
|
||||
* Because metastore client is created by using
|
||||
* {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy}
|
||||
* it will relogin when TGT is expired, so we don't need to relogin manually.
|
||||
*/
|
||||
UserGroupInformation.loginUserFromKeytab(
|
||||
catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_PRINCIPAL, ""),
|
||||
catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_KEYTAB, ""));
|
||||
} catch (IOException e) {
|
||||
throw new HMSClientException("login with kerberos auth failed for catalog %s", e, this.getName());
|
||||
HiveConf hiveConf = null;
|
||||
JdbcClientConfig jdbcClientConfig = null;
|
||||
String hiveMetastoreType = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_TYPE, "");
|
||||
if (hiveMetastoreType.equalsIgnoreCase("jdbc")) {
|
||||
jdbcClientConfig = new JdbcClientConfig();
|
||||
jdbcClientConfig.setUser(catalogProperty.getOrDefault("user", ""));
|
||||
jdbcClientConfig.setPassword(catalogProperty.getOrDefault("password", ""));
|
||||
jdbcClientConfig.setJdbcUrl(catalogProperty.getOrDefault("jdbc_url", ""));
|
||||
jdbcClientConfig.setDriverUrl(catalogProperty.getOrDefault("driver_url", ""));
|
||||
jdbcClientConfig.setDriverClass(catalogProperty.getOrDefault("driver_class", ""));
|
||||
} else {
|
||||
hiveConf = new HiveConf();
|
||||
for (Map.Entry<String, String> kv : catalogProperty.getHadoopProperties().entrySet()) {
|
||||
hiveConf.set(kv.getKey(), kv.getValue());
|
||||
}
|
||||
hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
|
||||
String.valueOf(Config.hive_metastore_client_timeout_second));
|
||||
String authentication = catalogProperty.getOrDefault(
|
||||
HdfsResource.HADOOP_SECURITY_AUTHENTICATION, "");
|
||||
if (AuthType.KERBEROS.getDesc().equals(authentication)) {
|
||||
hiveConf.set(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authentication);
|
||||
UserGroupInformation.setConfiguration(hiveConf);
|
||||
try {
|
||||
/**
|
||||
* Because metastore client is created by using
|
||||
* {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy}
|
||||
* it will relogin when TGT is expired, so we don't need to relogin manually.
|
||||
*/
|
||||
UserGroupInformation.loginUserFromKeytab(
|
||||
catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_PRINCIPAL, ""),
|
||||
catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_KEYTAB, ""));
|
||||
} catch (IOException e) {
|
||||
throw new HMSClientException("login with kerberos auth failed for catalog %s", e, this.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client = new PooledHiveMetaStoreClient(hiveConf,
|
||||
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size));
|
||||
client = HMSCachedClientFactory.createCachedClient(hiveConf,
|
||||
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -201,7 +214,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
return hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent();
|
||||
}
|
||||
|
||||
public PooledHiveMetaStoreClient getClient() {
|
||||
public HMSCachedClient getClient() {
|
||||
makeSureInitialized();
|
||||
return client;
|
||||
}
|
||||
|
||||
@ -131,7 +131,8 @@ import org.apache.doris.common.util.QueryableReentrantLock;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClient;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClientFactory;
|
||||
import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
import org.apache.doris.external.elasticsearch.EsRepository;
|
||||
import org.apache.doris.persist.AlterDatabasePropertyInfo;
|
||||
@ -2700,7 +2701,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
if (!Strings.isNullOrEmpty(hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION))) {
|
||||
hiveConf.set(HMSProperties.HIVE_VERSION, hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION));
|
||||
}
|
||||
PooledHiveMetaStoreClient client = new PooledHiveMetaStoreClient(hiveConf, 1);
|
||||
HMSCachedClient client = new HMSCachedClientFactory().createCachedClient(hiveConf, 1, null);
|
||||
if (!client.tableExists(hiveTable.getHiveDb(), hiveTable.getHiveTable())) {
|
||||
throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
|
||||
}
|
||||
|
||||
@ -0,0 +1,81 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
|
||||
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
|
||||
import org.apache.hadoop.hive.metastore.api.Database;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A hive metastore client pool for a specific catalog with hive configuration.
|
||||
* Currently, we support obtain hive metadata from thrift protocol and JDBC protocol.
|
||||
*/
|
||||
public interface HMSCachedClient {
|
||||
Database getDatabase(String dbName);
|
||||
|
||||
List<String> getAllDatabases();
|
||||
|
||||
List<String> getAllTables(String dbName);
|
||||
|
||||
boolean tableExists(String dbName, String tblName);
|
||||
|
||||
List<String> listPartitionNames(String dbName, String tblName);
|
||||
|
||||
List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum);
|
||||
|
||||
Partition getPartition(String dbName, String tblName, List<String> partitionValues);
|
||||
|
||||
List<Partition> getPartitions(String dbName, String tblName, List<String> partitionNames);
|
||||
|
||||
Table getTable(String dbName, String tblName);
|
||||
|
||||
List<FieldSchema> getSchema(String dbName, String tblName);
|
||||
|
||||
List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName,
|
||||
List<String> columns);
|
||||
|
||||
Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(
|
||||
String dbName, String tblName, List<String> partNames, List<String> columns);
|
||||
|
||||
CurrentNotificationEventId getCurrentNotificationEventId();
|
||||
|
||||
NotificationEventResponse getNextNotification(long lastEventId,
|
||||
int maxEvents,
|
||||
IMetaStoreClient.NotificationFilter filter) throws MetastoreNotificationFetchException;
|
||||
|
||||
long openTxn(String user);
|
||||
|
||||
void commitTxn(long txnId);
|
||||
|
||||
ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId);
|
||||
|
||||
void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
|
||||
List<String> partitionNames, long timeoutMs);
|
||||
}
|
||||
@ -0,0 +1,42 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.catalog.JdbcResource;
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClient;
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
|
||||
public class HMSCachedClientFactory {
|
||||
public static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize,
|
||||
JdbcClientConfig jdbcClientConfig) {
|
||||
if (hiveConf != null) {
|
||||
return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
|
||||
}
|
||||
Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and jdbcClientConfig are both null");
|
||||
String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl());
|
||||
switch (dbType) {
|
||||
case JdbcResource.POSTGRESQL:
|
||||
return new PostgreSQLJdbcHMSCachedClient(jdbcClientConfig);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported DB type: " + dbType);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -63,7 +63,7 @@ public class HiveTransaction {
|
||||
return isFullAcid;
|
||||
}
|
||||
|
||||
public ValidWriteIdList getValidWriteIds(PooledHiveMetaStoreClient client) {
|
||||
public ValidWriteIdList getValidWriteIds(HMSCachedClient client) {
|
||||
if (validWriteIdList == null) {
|
||||
TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(),
|
||||
hiveTable.getName());
|
||||
|
||||
@ -0,0 +1,37 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClient;
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* This class uses the JDBC protocol to directly access the relational databases under HMS
|
||||
* to obtain Hive metadata information
|
||||
*/
|
||||
public abstract class JdbcHMSCachedClient extends JdbcClient implements HMSCachedClient {
|
||||
protected JdbcClientConfig jdbcClientConfig;
|
||||
|
||||
protected JdbcHMSCachedClient(JdbcClientConfig jdbcClientConfig) {
|
||||
super(jdbcClientConfig);
|
||||
Preconditions.checkNotNull(jdbcClientConfig, "JdbcClientConfig can not be null");
|
||||
this.jdbcClientConfig = jdbcClientConfig;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,477 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.JdbcTable;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.datasource.HMSClientException;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
|
||||
import org.apache.doris.thrift.TOdbcTableType;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableList.Builder;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
|
||||
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
|
||||
import org.apache.hadoop.hive.metastore.api.Database;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.PrincipalType;
|
||||
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
|
||||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient {
|
||||
private static final Logger LOG = LogManager.getLogger(PostgreSQLJdbcHMSCachedClient.class);
|
||||
|
||||
public PostgreSQLJdbcHMSCachedClient(JdbcClientConfig jdbcClientConfig) {
|
||||
super(jdbcClientConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Database getDatabase(String dbName) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllDatabases() {
|
||||
String nameFiled = JdbcTable.databaseProperName(TOdbcTableType.POSTGRESQL, "NAME");
|
||||
String tableName = JdbcTable.databaseProperName(TOdbcTableType.POSTGRESQL, "DBS");
|
||||
String sql = String.format("SELECT %s FROM %s;", nameFiled, tableName);
|
||||
LOG.debug("getAllDatabases exec sql: {}", sql);
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
Builder<String> builder = ImmutableList.builder();
|
||||
while (rs.next()) {
|
||||
String hiveDatabaseName = rs.getString("NAME");
|
||||
builder.add(hiveDatabaseName);
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get all database from hms client", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllTables(String dbName) {
|
||||
String sql = "SELECT \"TBL_NAME\" FROM \"TBLS\" join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
|
||||
+ " WHERE \"DBS\".\"NAME\" = '" + dbName + "';";
|
||||
LOG.debug("getAllTables exec sql: {}", sql);
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
Builder<String> builder = ImmutableList.builder();
|
||||
while (rs.next()) {
|
||||
String hiveDatabaseName = rs.getString("TBL_NAME");
|
||||
builder.add(hiveDatabaseName);
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get all tables for db %s", e, dbName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableExists(String dbName, String tblName) {
|
||||
List<String> allTables = getAllTables(dbName);
|
||||
return allTables.contains(tblName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listPartitionNames(String dbName, String tblName) {
|
||||
return listPartitionNames(dbName, tblName, (long) -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum) {
|
||||
String sql = String.format("SELECT \"PART_NAME\" from \"PARTITIONS\" WHERE \"TBL_ID\" = ("
|
||||
+ "SELECT \"TBL_ID\" FROM \"TBLS\" join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
|
||||
+ " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s');", dbName, tblName);
|
||||
LOG.debug("listPartitionNames exec sql: {}", sql);
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
Builder<String> builder = ImmutableList.builder();
|
||||
while (rs.next()) {
|
||||
String hivePartitionName = rs.getString("PART_NAME");
|
||||
builder.add(hivePartitionName);
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to list partition names for table %s in db %s", e, tblName, dbName);
|
||||
}
|
||||
}
|
||||
|
||||
// not used
|
||||
@Override
|
||||
public Partition getPartition(String dbName, String tblName, List<String> partitionValues) {
|
||||
LOG.debug("getPartition partitionValues: {}", partitionValues);
|
||||
String partitionName = Joiner.on("/").join(partitionValues);
|
||||
ImmutableList<String> partitionNames = ImmutableList.of(partitionName);
|
||||
LOG.debug("getPartition partitionNames: {}", partitionNames);
|
||||
List<Partition> partitions = getPartitionsByNames(dbName, tblName, partitionNames);
|
||||
if (!partitions.isEmpty()) {
|
||||
return partitions.get(0);
|
||||
}
|
||||
throw new HMSClientException("Can not get partition of partitionName = " + partitionName
|
||||
+ ", from " + dbName + "." + tblName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Partition> getPartitions(String dbName, String tblName, List<String> partitionNames) {
|
||||
return getPartitionsByNames(dbName, tblName, partitionNames);
|
||||
}
|
||||
|
||||
private List<Partition> getPartitionsByNames(String dbName, String tblName, List<String> partitionNames) {
|
||||
List<String> partitionNamesWithQuote = partitionNames.stream().map(partitionName -> "'" + partitionName + "'")
|
||||
.collect(Collectors.toList());
|
||||
String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote);
|
||||
String sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\","
|
||||
+ " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
|
||||
+ " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\""
|
||||
+ " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\""
|
||||
+ " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
|
||||
+ " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'"
|
||||
+ " AND \"PART_NAME\" in (%s);",
|
||||
dbName, tblName, partitionNamesString);
|
||||
LOG.debug("getPartitionsByNames exec sql: {}", sql);
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
Builder<Partition> builder = ImmutableList.builder();
|
||||
while (rs.next()) {
|
||||
Partition partition = new Partition();
|
||||
partition.setDbName(dbName);
|
||||
partition.setTableName(tblName);
|
||||
partition.setCreateTime(rs.getInt("CREATE_TIME"));
|
||||
partition.setLastAccessTime(rs.getInt("LAST_ACCESS_TIME"));
|
||||
|
||||
// set partition values
|
||||
partition.setValues(getPartitionValues(rs.getInt("PART_ID")));
|
||||
|
||||
// set SD
|
||||
StorageDescriptor storageDescriptor = getStorageDescriptor(rs.getInt("SD_ID"));
|
||||
partition.setSd(storageDescriptor);
|
||||
|
||||
builder.add(partition);
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get partition for table %s in db %s with value %s", e, tblName,
|
||||
dbName, partitionNames);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getPartitionValues(int partitionId) {
|
||||
String sql = String.format("SELECT \"PART_KEY_VAL\" FROM \"PARTITION_KEY_VALS\""
|
||||
+ " WHERE \"PART_ID\" = " + partitionId);
|
||||
LOG.debug("getPartitionValues exec sql: {}", sql);
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
Builder<String> builder = ImmutableList.builder();
|
||||
while (rs.next()) {
|
||||
builder.add(rs.getString("PART_KEY_VAL"));
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get partition Value for partitionId %s", e, partitionId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table getTable(String dbName, String tblName) {
|
||||
String sql = "SELECT \"TBL_ID\", \"TBL_NAME\", \"DBS\".\"NAME\", \"OWNER\", \"CREATE_TIME\","
|
||||
+ " \"LAST_ACCESS_TIME\", \"LAST_ACCESS_TIME\", \"RETENTION\", \"TBLS\".\"SD_ID\", "
|
||||
+ " \"IS_REWRITE_ENABLED\", \"VIEW_EXPANDED_TEXT\", \"VIEW_ORIGINAL_TEXT\", \"DBS\".\"OWNER_TYPE\""
|
||||
+ " FROM \"TBLS\" join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\" "
|
||||
+ " WHERE \"DBS\".\"NAME\" = '" + dbName + "' AND \"TBLS\".\"TBL_NAME\"='" + tblName + "';";
|
||||
LOG.debug("getTable exec sql: {}", sql);
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
Table hiveTable = new Table();
|
||||
if (rs.next()) {
|
||||
hiveTable.setTableName(rs.getString("TBL_NAME"));
|
||||
hiveTable.setDbName(rs.getString("NAME"));
|
||||
hiveTable.setOwner(rs.getString("OWNER"));
|
||||
hiveTable.setCreateTime(rs.getInt("CREATE_TIME"));
|
||||
hiveTable.setLastAccessTime(rs.getInt("LAST_ACCESS_TIME"));
|
||||
hiveTable.setRetention(rs.getInt("RETENTION"));
|
||||
hiveTable.setOwnerType(getOwnerType(rs.getString("OWNER_TYPE")));
|
||||
hiveTable.setRewriteEnabled(rs.getBoolean("IS_REWRITE_ENABLED"));
|
||||
hiveTable.setViewExpandedText(rs.getString("VIEW_EXPANDED_TEXT"));
|
||||
hiveTable.setViewOriginalText(rs.getString("VIEW_ORIGINAL_TEXT"));
|
||||
|
||||
hiveTable.setSd(getStorageDescriptor(rs.getInt("SD_ID")));
|
||||
hiveTable.setParameters(getTableParameters(rs.getInt("TBL_ID")));
|
||||
hiveTable.setPartitionKeys(getTablePartitionKeys(rs.getInt("TBL_ID")));
|
||||
return hiveTable;
|
||||
}
|
||||
throw new HMSClientException("Can not get Table from PG databases. dbName = " + dbName
|
||||
+ ", tblName = " + tblName);
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get table %s in db %s from hms client", e, tblName, dbName);
|
||||
}
|
||||
}
|
||||
|
||||
private StorageDescriptor getStorageDescriptor(int sdId) {
|
||||
String sql = "SELECT * from \"SDS\" WHERE \"SD_ID\" = " + sdId;
|
||||
LOG.debug("getStorageDescriptorByDbAndTable exec sql: {}", sql);
|
||||
|
||||
StorageDescriptor sd = new StorageDescriptor();
|
||||
sd.setCols(getSchemaExcludePartitionKeys(sdId));
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
if (rs.next()) {
|
||||
sd.setSerdeInfo(getSerdeInfo(rs.getInt("SERDE_ID")));
|
||||
sd.setInputFormat(rs.getString("INPUT_FORMAT"));
|
||||
sd.setCompressed(rs.getBoolean("IS_COMPRESSED"));
|
||||
sd.setLocation(rs.getString("LOCATION"));
|
||||
sd.setNumBuckets(rs.getInt("NUM_BUCKETS"));
|
||||
sd.setOutputFormat(rs.getString("OUTPUT_FORMAT"));
|
||||
sd.setStoredAsSubDirectories(rs.getBoolean("IS_STOREDASSUBDIRECTORIES"));
|
||||
return sd;
|
||||
}
|
||||
throw new HMSClientException("Can not get StorageDescriptor from PG, SD_ID = " + sdId);
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get StorageDescriptor in sdId %s", e, sdId);
|
||||
}
|
||||
}
|
||||
|
||||
private SerDeInfo getSerdeInfo(int serdeId) {
|
||||
String sql = "SELECT * FROM \"SERDES\" WHERE \"SERDE_ID\" = " + serdeId;
|
||||
LOG.debug("getSerdeInfo exec sql: {}", sql);
|
||||
|
||||
SerDeInfo serDeInfo = new SerDeInfo();
|
||||
serDeInfo.setParameters(getSerdeInfoParameters(serdeId));
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
if (rs.next()) {
|
||||
serDeInfo.setName(rs.getString("NAME"));
|
||||
serDeInfo.setSerializationLib(rs.getString("SLIB"));
|
||||
return serDeInfo;
|
||||
}
|
||||
throw new HMSClientException("Can not get SerDeInfo from PG databases, serdeId = " + serdeId + ".");
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get SerdeInfo in serdeId %s", e, serdeId);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> getSerdeInfoParameters(int serdeId) {
|
||||
String sql = "SELECT \"PARAM_KEY\", \"PARAM_VALUE\" from \"SERDE_PARAMS\" WHERE \"SERDE_ID\" = " + serdeId;
|
||||
LOG.debug("getSerdeInfoParameters exec sql: {}", sql);
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
|
||||
while (rs.next()) {
|
||||
builder.put(rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE"));
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get SerdeInfoParameters in serdeId %s", e, serdeId);
|
||||
}
|
||||
}
|
||||
|
||||
private List<FieldSchema> getTablePartitionKeys(int tableId) {
|
||||
String sql = "SELECT \"PKEY_NAME\", \"PKEY_TYPE\", \"PKEY_COMMENT\" from \"PARTITION_KEYS\""
|
||||
+ " WHERE \"TBL_ID\"= " + tableId;
|
||||
LOG.debug("getTablePartitionKeys exec sql: {}", sql);
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
Builder<FieldSchema> builder = ImmutableList.builder();
|
||||
while (rs.next()) {
|
||||
FieldSchema fieldSchema = new FieldSchema(rs.getString("PKEY_NAME"),
|
||||
rs.getString("PKEY_TYPE"), rs.getString("PKEY_COMMENT"));
|
||||
builder.add(fieldSchema);
|
||||
}
|
||||
|
||||
List<FieldSchema> fieldSchemas = builder.build();
|
||||
// must reverse fields
|
||||
List<FieldSchema> reversedFieldSchemas = Lists.newArrayList(fieldSchemas);
|
||||
Collections.reverse(reversedFieldSchemas);
|
||||
return reversedFieldSchemas;
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get TablePartitionKeys in tableId %s", e, tableId);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> getTableParameters(int tableId) {
|
||||
String sql = "SELECT \"PARAM_KEY\", \"PARAM_VALUE\" from \"TABLE_PARAMS\" WHERE \"TBL_ID\" = " + tableId;
|
||||
LOG.debug("getParameters exec sql: {}", sql);
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
|
||||
while (rs.next()) {
|
||||
builder.put(rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE"));
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get TableParameters in tableId %s", e, tableId);
|
||||
}
|
||||
}
|
||||
|
||||
private PrincipalType getOwnerType(String ownerTypeString) {
|
||||
switch (ownerTypeString) {
|
||||
case "USER":
|
||||
return PrincipalType.findByValue(1);
|
||||
case "ROLE":
|
||||
return PrincipalType.findByValue(2);
|
||||
case "GROUP":
|
||||
return PrincipalType.findByValue(3);
|
||||
default:
|
||||
throw new HMSClientException("Unknown owner type of this table");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FieldSchema> getSchema(String dbName, String tblName) {
|
||||
String sql = "SELECT \"COLUMN_NAME\", \"TYPE_NAME\", \"COMMENT\", \"TBLS\".\"TBL_ID\""
|
||||
+ " FROM \"TBLS\" join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
|
||||
+ " join \"SDS\" on \"SDS\".\"SD_ID\" = \"TBLS\".\"SD_ID\""
|
||||
+ " join \"COLUMNS_V2\" on \"COLUMNS_V2\".\"CD_ID\" = \"SDS\".\"CD_ID\""
|
||||
+ " WHERE \"DBS\".\"NAME\" = '" + dbName + "' AND \"TBLS\".\"TBL_NAME\"='" + tblName + "';";
|
||||
LOG.debug("getSchema exec sql: {}", sql);
|
||||
|
||||
Builder<FieldSchema> builder = ImmutableList.builder();
|
||||
int tableId = -1;
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
FieldSchema fieldSchema = new FieldSchema(rs.getString("COLUMN_NAME"),
|
||||
rs.getString("TYPE_NAME"), rs.getString("COMMENT"));
|
||||
builder.add(fieldSchema);
|
||||
// actually, all resultSets have the same TBL_ID.
|
||||
tableId = rs.getInt("TBL_ID");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("Can not get schema of db = " + dbName + ", table = " + tblName);
|
||||
}
|
||||
|
||||
// add partition columns
|
||||
getTablePartitionKeys(tableId).stream().forEach(field -> builder.add(field));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private List<FieldSchema> getSchemaExcludePartitionKeys(int sdId) {
|
||||
String sql = "SELECT \"COLUMN_NAME\", \"TYPE_NAME\", \"COMMENT\""
|
||||
+ " FROM \"SDS\" join \"COLUMNS_V2\" on \"COLUMNS_V2\".\"CD_ID\" = \"SDS\".\"CD_ID\""
|
||||
+ " WHERE \"SDS\".\"SD_ID\" = " + sdId;
|
||||
LOG.debug("getSchema exec sql: {}", sql);
|
||||
|
||||
Builder<FieldSchema> colsExcludePartitionKeys = ImmutableList.builder();
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement stmt = conn.prepareStatement(sql);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
FieldSchema fieldSchema = new FieldSchema(rs.getString("COLUMN_NAME"),
|
||||
rs.getString("TYPE_NAME"), rs.getString("COMMENT"));
|
||||
colsExcludePartitionKeys.add(fieldSchema);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("Can not get schema of SD_ID = " + sdId);
|
||||
}
|
||||
return colsExcludePartitionKeys.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
// no use
|
||||
@Override
|
||||
public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(String dbName, String tblName,
|
||||
List<String> partNames, List<String> columns) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CurrentNotificationEventId getCurrentNotificationEventId() {
|
||||
throw new MetastoreNotificationFetchException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public NotificationEventResponse getNextNotification(long lastEventId, int maxEvents, NotificationFilter filter) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long openTxn(String user) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitTxn(long txnId) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
|
||||
List<String> partitionNames, long timeoutMs) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getDatabaseQuery() {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
|
||||
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
|
||||
}
|
||||
}
|
||||
@ -65,29 +65,33 @@ import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
|
||||
/**
|
||||
* A hive metastore client pool for a specific catalog with hive configuration.
|
||||
* This class uses the thrift protocol to directly access the HiveMetaStore service
|
||||
* to obtain Hive metadata information
|
||||
*/
|
||||
public class PooledHiveMetaStoreClient {
|
||||
private static final Logger LOG = LogManager.getLogger(PooledHiveMetaStoreClient.class);
|
||||
public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
private static final Logger LOG = LogManager.getLogger(ThriftHMSCachedClient.class);
|
||||
|
||||
private static final HiveMetaHookLoader DUMMY_HOOK_LOADER = t -> null;
|
||||
// -1 means no limit on the partitions returned.
|
||||
private static final short MAX_LIST_PARTITION_NUM = Config.max_hive_list_partition_num;
|
||||
|
||||
private Queue<CachedClient> clientPool = new LinkedList<>();
|
||||
private Queue<ThriftHMSClient> clientPool = new LinkedList<>();
|
||||
private final int poolSize;
|
||||
private final HiveConf hiveConf;
|
||||
|
||||
public PooledHiveMetaStoreClient(HiveConf hiveConf, int pooSize) {
|
||||
Preconditions.checkArgument(pooSize > 0, pooSize);
|
||||
hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
|
||||
String.valueOf(Config.hive_metastore_client_timeout_second));
|
||||
public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) {
|
||||
Preconditions.checkArgument(poolSize > 0, poolSize);
|
||||
if (hiveConf != null) {
|
||||
hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
|
||||
String.valueOf(Config.hive_metastore_client_timeout_second));
|
||||
}
|
||||
this.hiveConf = hiveConf;
|
||||
this.poolSize = pooSize;
|
||||
this.poolSize = poolSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllDatabases() {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getAllDatabases();
|
||||
} catch (Exception e) {
|
||||
@ -99,8 +103,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllTables(String dbName) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getAllTables(dbName);
|
||||
} catch (Exception e) {
|
||||
@ -112,8 +117,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableExists(String dbName, String tblName) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.tableExists(dbName, tblName);
|
||||
} catch (Exception e) {
|
||||
@ -125,14 +131,16 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listPartitionNames(String dbName, String tblName) {
|
||||
return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
|
||||
}
|
||||
|
||||
public List<String> listPartitionNames(String dbName, String tblName, long max) {
|
||||
@Override
|
||||
public List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum) {
|
||||
// list all parts when the limit is greater than the short maximum
|
||||
short limited = max <= Short.MAX_VALUE ? (short) max : MAX_LIST_PARTITION_NUM;
|
||||
try (CachedClient client = getClient()) {
|
||||
short limited = maxListPartitionNum <= Short.MAX_VALUE ? (short) maxListPartitionNum : MAX_LIST_PARTITION_NUM;
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.listPartitionNames(dbName, tblName, limited);
|
||||
} catch (Exception e) {
|
||||
@ -144,8 +152,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Partition getPartition(String dbName, String tblName, List<String> partitionValues) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getPartition(dbName, tblName, partitionValues);
|
||||
} catch (Exception e) {
|
||||
@ -158,8 +167,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Partition> getPartitions(String dbName, String tblName, List<String> partitionNames) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getPartitionsByNames(dbName, tblName, partitionNames);
|
||||
} catch (Exception e) {
|
||||
@ -168,26 +178,13 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get partition for table %s in db %s with value %s", e, tblName,
|
||||
dbName, partitionNames);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Partition> getPartitionsByFilter(String dbName, String tblName, String filter) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try {
|
||||
return client.client.listPartitionsByFilter(dbName, tblName, filter, MAX_LIST_PARTITION_NUM);
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get partition by filter for table %s in db %s", e, tblName,
|
||||
dbName);
|
||||
dbName, partitionNames);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Database getDatabase(String dbName) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getDatabase(dbName);
|
||||
} catch (Exception e) {
|
||||
@ -199,8 +196,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table getTable(String dbName, String tblName) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getTable(dbName, tblName);
|
||||
} catch (Exception e) {
|
||||
@ -212,8 +210,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FieldSchema> getSchema(String dbName, String tblName) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getSchema(dbName, tblName);
|
||||
} catch (Exception e) {
|
||||
@ -225,8 +224,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getTableColumnStatistics(dbName, tblName, columns);
|
||||
} catch (Exception e) {
|
||||
@ -238,9 +238,10 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(
|
||||
String dbName, String tblName, List<String> partNames, List<String> columns) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns);
|
||||
} catch (Exception e) {
|
||||
@ -252,8 +253,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CurrentNotificationEventId getCurrentNotificationEventId() {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getCurrentNotificationEventId();
|
||||
} catch (Exception e) {
|
||||
@ -267,11 +269,12 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NotificationEventResponse getNextNotification(long lastEventId,
|
||||
int maxEvents,
|
||||
IMetaStoreClient.NotificationFilter filter)
|
||||
throws MetastoreNotificationFetchException {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getNextNotification(lastEventId, maxEvents, filter);
|
||||
} catch (Exception e) {
|
||||
@ -286,8 +289,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long openTxn(String user) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.openTxn(user);
|
||||
} catch (Exception e) {
|
||||
@ -299,8 +303,9 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitTxn(long txnId) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
client.client.commitTxn(txnId);
|
||||
} catch (Exception e) {
|
||||
@ -312,6 +317,7 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
|
||||
List<String> partitionNames, long timeoutMs) {
|
||||
LockRequestBuilder request = new LockRequestBuilder(queryId).setTransactionId(txnId).setUser(user);
|
||||
@ -319,7 +325,7 @@ public class PooledHiveMetaStoreClient {
|
||||
for (LockComponent component : lockComponents) {
|
||||
request.addLockComponent(component);
|
||||
}
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
LockResponse response;
|
||||
try {
|
||||
response = client.client.lock(request.build());
|
||||
@ -346,11 +352,13 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
// Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive
|
||||
// Do not pass currentTransactionId instead as it will break Hive's listing of delta directories if major compaction
|
||||
// Do not pass currentTransactionId instead as
|
||||
// it will break Hive's listing of delta directories if major compaction
|
||||
// deletes delta directories for valid transactions that existed at the time transaction is opened
|
||||
ValidTxnList validTransactions = client.client.getValidTxns();
|
||||
List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds(
|
||||
@ -362,11 +370,11 @@ public class PooledHiveMetaStoreClient {
|
||||
tableValidWriteIdsList);
|
||||
ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
|
||||
return writeIdList;
|
||||
} catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
} catch (Exception e) {
|
||||
// Ignore this exception when the version of hive is not compatible with these apis.
|
||||
// Currently, the workaround is using a max watermark.
|
||||
LOG.warn("failed to get valid write ids for {}, transaction {}", fullTableName, currentTransactionId, e);
|
||||
@ -375,7 +383,7 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
|
||||
private LockResponse checkLock(long lockId) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.checkLock(lockId);
|
||||
} catch (Exception e) {
|
||||
@ -411,24 +419,11 @@ public class PooledHiveMetaStoreClient {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public void heartbeatForTxn(long txnId) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try {
|
||||
client.client.heartbeat(txnId, txnId);
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to do heartbeat for transaction " + txnId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private class CachedClient implements AutoCloseable {
|
||||
private class ThriftHMSClient implements AutoCloseable {
|
||||
private final IMetaStoreClient client;
|
||||
private volatile Throwable throwable;
|
||||
|
||||
private CachedClient(HiveConf hiveConf) throws MetaException {
|
||||
private ThriftHMSClient(HiveConf hiveConf) throws MetaException {
|
||||
String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
|
||||
if (HMSProperties.DLF_TYPE.equalsIgnoreCase(type)) {
|
||||
client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER,
|
||||
@ -458,14 +453,14 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
private CachedClient getClient() throws MetaException {
|
||||
private ThriftHMSClient getClient() throws MetaException {
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
|
||||
synchronized (clientPool) {
|
||||
CachedClient client = clientPool.poll();
|
||||
ThriftHMSClient client = clientPool.poll();
|
||||
if (client == null) {
|
||||
return new CachedClient(hiveConf);
|
||||
return new ThriftHMSClient(hiveConf);
|
||||
}
|
||||
return client;
|
||||
}
|
||||
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
import org.apache.doris.datasource.hive.HMSCachedClient;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -144,7 +146,7 @@ public abstract class MetastoreEvent {
|
||||
|
||||
/**
|
||||
* Process the information available in the NotificationEvent.
|
||||
* Better not to call (direct/indirect) apis of {@link org.apache.doris.datasource.hive.PooledHiveMetaStoreClient}
|
||||
* Better not to call (direct/indirect) apis of {@link HMSCachedClient}
|
||||
* during handling hms events (Reference to https://github.com/apache/doris/pull/19120).
|
||||
* Try to add some fallback strategies if it is highly necessary.
|
||||
*/
|
||||
|
||||
@ -154,7 +154,7 @@ public abstract class JdbcClient {
|
||||
}
|
||||
}
|
||||
|
||||
private static String parseDbType(String jdbcUrl) {
|
||||
public static String parseDbType(String jdbcUrl) {
|
||||
try {
|
||||
return JdbcResource.parseDbType(jdbcUrl);
|
||||
} catch (DdlException e) {
|
||||
|
||||
@ -18,9 +18,11 @@
|
||||
|
||||
package org.apache.doris.datasource.jdbc.client;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class JdbcClientConfig {
|
||||
public class JdbcClientConfig implements Cloneable {
|
||||
private String catalog;
|
||||
private String user;
|
||||
private String password;
|
||||
@ -29,8 +31,23 @@ public class JdbcClientConfig {
|
||||
private String driverClass;
|
||||
private String onlySpecifiedDatabase;
|
||||
private String isLowerCaseTableNames;
|
||||
private Map<String, Boolean> includeDatabaseMap;
|
||||
private Map<String, Boolean> excludeDatabaseMap;
|
||||
private Map<String, Boolean> includeDatabaseMap = Maps.newHashMap();
|
||||
private Map<String, Boolean> excludeDatabaseMap = Maps.newHashMap();
|
||||
private Map<String, String> customizedProperties = Maps.newHashMap();
|
||||
|
||||
@Override
|
||||
public JdbcClientConfig clone() {
|
||||
try {
|
||||
JdbcClientConfig cloned = (JdbcClientConfig) super.clone();
|
||||
|
||||
cloned.includeDatabaseMap = Maps.newHashMap(includeDatabaseMap);
|
||||
cloned.excludeDatabaseMap = Maps.newHashMap(excludeDatabaseMap);
|
||||
cloned.customizedProperties = Maps.newHashMap(customizedProperties);
|
||||
return cloned;
|
||||
} catch (CloneNotSupportedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public String getCatalog() {
|
||||
return catalog;
|
||||
@ -121,4 +138,12 @@ public class JdbcClientConfig {
|
||||
this.excludeDatabaseMap = excludeDatabaseMap;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setCustomizedProperties(Map<String, String> customizedProperties) {
|
||||
this.customizedProperties = customizedProperties;
|
||||
}
|
||||
|
||||
public Map<String, String> getCustomizedProperties() {
|
||||
return customizedProperties;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user