[fix](multi-catalog)access HMS need ugiDoAs (#30595)
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.datasource.HMSClientException;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
@ -56,6 +57,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
@ -93,7 +95,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public List<String> getAllDatabases() {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getAllDatabases();
|
||||
return ugiDoAs(client.client::getAllDatabases);
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -107,7 +109,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public List<String> getAllTables(String dbName) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getAllTables(dbName);
|
||||
return ugiDoAs(() -> client.client.getAllTables(dbName));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -121,7 +123,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public boolean tableExists(String dbName, String tblName) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.tableExists(dbName, tblName);
|
||||
return ugiDoAs(() -> client.client.tableExists(dbName, tblName));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -142,7 +144,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
short limited = maxListPartitionNum <= Short.MAX_VALUE ? (short) maxListPartitionNum : MAX_LIST_PARTITION_NUM;
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.listPartitionNames(dbName, tblName, limited);
|
||||
return ugiDoAs(() -> client.client.listPartitionNames(dbName, tblName, limited));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -156,7 +158,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public Partition getPartition(String dbName, String tblName, List<String> partitionValues) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getPartition(dbName, tblName, partitionValues);
|
||||
return ugiDoAs(() -> client.client.getPartition(dbName, tblName, partitionValues));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -171,7 +173,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public List<Partition> getPartitions(String dbName, String tblName, List<String> partitionNames) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getPartitionsByNames(dbName, tblName, partitionNames);
|
||||
return ugiDoAs(() -> client.client.getPartitionsByNames(dbName, tblName, partitionNames));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -186,7 +188,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public Database getDatabase(String dbName) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getDatabase(dbName);
|
||||
return ugiDoAs(() -> client.client.getDatabase(dbName));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -200,7 +202,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public Table getTable(String dbName, String tblName) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getTable(dbName, tblName);
|
||||
return ugiDoAs(() -> client.client.getTable(dbName, tblName));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -214,7 +216,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public List<FieldSchema> getSchema(String dbName, String tblName) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getSchema(dbName, tblName);
|
||||
return ugiDoAs(() -> client.client.getSchema(dbName, tblName));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -228,7 +230,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getTableColumnStatistics(dbName, tblName, columns);
|
||||
return ugiDoAs(() -> client.client.getTableColumnStatistics(dbName, tblName, columns));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -243,7 +245,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
String dbName, String tblName, List<String> partNames, List<String> columns) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns);
|
||||
return ugiDoAs(() -> client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -257,7 +259,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public CurrentNotificationEventId getCurrentNotificationEventId() {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getCurrentNotificationEventId();
|
||||
return ugiDoAs(client.client::getCurrentNotificationEventId);
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -271,12 +273,12 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
|
||||
@Override
|
||||
public NotificationEventResponse getNextNotification(long lastEventId,
|
||||
int maxEvents,
|
||||
IMetaStoreClient.NotificationFilter filter)
|
||||
int maxEvents,
|
||||
IMetaStoreClient.NotificationFilter filter)
|
||||
throws MetastoreNotificationFetchException {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getNextNotification(lastEventId, maxEvents, filter);
|
||||
return ugiDoAs(() -> client.client.getNextNotification(lastEventId, maxEvents, filter));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -293,7 +295,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public long openTxn(String user) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.openTxn(user);
|
||||
return ugiDoAs(() -> client.client.openTxn(user));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -307,7 +309,10 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public void commitTxn(long txnId) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
client.client.commitTxn(txnId);
|
||||
ugiDoAs(() -> {
|
||||
client.client.commitTxn(txnId);
|
||||
return null;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -319,7 +324,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
|
||||
@Override
|
||||
public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
|
||||
List<String> partitionNames, long timeoutMs) {
|
||||
List<String> partitionNames, long timeoutMs) {
|
||||
LockRequestBuilder request = new LockRequestBuilder(queryId).setTransactionId(txnId).setUser(user);
|
||||
List<LockComponent> lockComponents = createLockComponentsForRead(tblName, partitionNames);
|
||||
for (LockComponent component : lockComponents) {
|
||||
@ -328,7 +333,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
LockResponse response;
|
||||
try {
|
||||
response = client.client.lock(request.build());
|
||||
response = ugiDoAs(() -> client.client.lock(request.build()));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -356,20 +361,22 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
// Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive
|
||||
// Do not pass currentTransactionId instead as
|
||||
// it will break Hive's listing of delta directories if major compaction
|
||||
// deletes delta directories for valid transactions that existed at the time transaction is opened
|
||||
ValidTxnList validTransactions = client.client.getValidTxns();
|
||||
List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds(
|
||||
Collections.singletonList(fullTableName), validTransactions.toString());
|
||||
if (tableValidWriteIdsList.size() != 1) {
|
||||
throw new Exception("tableValidWriteIdsList's size should be 1");
|
||||
}
|
||||
ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId,
|
||||
tableValidWriteIdsList);
|
||||
ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
|
||||
return writeIdList;
|
||||
return ugiDoAs(() -> {
|
||||
// Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive
|
||||
// Do not pass currentTransactionId instead as
|
||||
// it will break Hive's listing of delta directories if major compaction
|
||||
// deletes delta directories for valid transactions that existed at the time transaction is opened
|
||||
ValidTxnList validTransactions = client.client.getValidTxns();
|
||||
List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds(
|
||||
Collections.singletonList(fullTableName), validTransactions.toString());
|
||||
if (tableValidWriteIdsList.size() != 1) {
|
||||
throw new Exception("tableValidWriteIdsList's size should be 1");
|
||||
}
|
||||
ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId,
|
||||
tableValidWriteIdsList);
|
||||
ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
|
||||
return writeIdList;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -385,7 +392,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
private LockResponse checkLock(long lockId) {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
try {
|
||||
return client.client.checkLock(lockId);
|
||||
return ugiDoAs(() -> client.client.checkLock(lockId));
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
@ -460,7 +467,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
synchronized (clientPool) {
|
||||
ThriftHMSClient client = clientPool.poll();
|
||||
if (client == null) {
|
||||
return new ThriftHMSClient(hiveConf);
|
||||
return ugiDoAs(() -> new ThriftHMSClient(hiveConf));
|
||||
}
|
||||
return client;
|
||||
}
|
||||
@ -468,5 +475,9 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
}
|
||||
}
|
||||
|
||||
private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) {
|
||||
return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user