[fix](kerberos)fix and refactor ugi login for kerberos and simple authentication (#38607)
pick from (#37301)
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
|
||||
import org.apache.doris.datasource.DatabaseMetadata;
|
||||
import org.apache.doris.datasource.TableMetadata;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
@ -113,6 +114,10 @@ public interface HMSCachedClient {
|
||||
|
||||
void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData);
|
||||
|
||||
default void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) {
|
||||
// Ignored by default
|
||||
}
|
||||
|
||||
/**
|
||||
* close the connection, eg, to hms
|
||||
*/
|
||||
|
||||
@ -24,7 +24,7 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.security.authentication.AuthenticationConfig;
|
||||
import org.apache.doris.common.security.authentication.HadoopUGI;
|
||||
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
|
||||
import org.apache.doris.datasource.CatalogProperty;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.ExternalDatabase;
|
||||
@ -40,7 +40,9 @@ import org.apache.doris.fs.FileSystemProviderImpl;
|
||||
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
|
||||
import org.apache.doris.transaction.TransactionManagerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Strings;
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -68,7 +70,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
|
||||
private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
|
||||
private ThreadPoolExecutor fileSystemExecutor;
|
||||
@Getter
|
||||
private HadoopAuthenticator authenticator;
|
||||
|
||||
@VisibleForTesting
|
||||
public HMSExternalCatalog() {
|
||||
catalogProperty = new CatalogProperty(null, null);
|
||||
}
|
||||
@ -81,6 +86,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
super(catalogId, name, InitCatalogLog.Type.HMS, comment);
|
||||
props = PropertyConverter.convertToMetaProperties(props);
|
||||
catalogProperty = new CatalogProperty(resource, props);
|
||||
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
|
||||
authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -148,9 +155,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
|
||||
String.valueOf(Config.hive_metastore_client_timeout_second));
|
||||
HadoopUGI.tryKrbLogin(this.getName(), AuthenticationConfig.getKerberosConfig(hiveConf,
|
||||
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
|
||||
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
|
||||
}
|
||||
HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
|
||||
FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
|
||||
|
||||
@ -33,6 +33,7 @@ import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.info.SimpleTableInfo;
|
||||
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
|
||||
import org.apache.doris.datasource.ExternalDatabase;
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClient;
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
|
||||
@ -61,11 +62,14 @@ public class HiveMetadataOps implements ExternalMetadataOps {
|
||||
private static final int MIN_CLIENT_POOL_SIZE = 8;
|
||||
private final HMSCachedClient client;
|
||||
private final HMSExternalCatalog catalog;
|
||||
private HadoopAuthenticator hadoopAuthenticator;
|
||||
|
||||
public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) {
|
||||
this(catalog, createCachedClient(hiveConf,
|
||||
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size),
|
||||
jdbcClientConfig));
|
||||
hadoopAuthenticator = catalog.getAuthenticator();
|
||||
client.setHadoopAuthenticator(hadoopAuthenticator);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -85,7 +89,8 @@ public class HiveMetadataOps implements ExternalMetadataOps {
|
||||
private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize,
|
||||
JdbcClientConfig jdbcClientConfig) {
|
||||
if (hiveConf != null) {
|
||||
return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
|
||||
ThriftHMSCachedClient client = new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
|
||||
return client;
|
||||
}
|
||||
Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and jdbcClientConfig are both null");
|
||||
String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl());
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
|
||||
import org.apache.doris.datasource.DatabaseMetadata;
|
||||
import org.apache.doris.datasource.TableMetadata;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
@ -92,6 +93,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
private boolean isClosed = false;
|
||||
private final int poolSize;
|
||||
private final HiveConf hiveConf;
|
||||
private HadoopAuthenticator hadoopAuthenticator;
|
||||
|
||||
public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) {
|
||||
Preconditions.checkArgument(poolSize > 0, poolSize);
|
||||
@ -104,6 +106,10 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
this.isClosed = false;
|
||||
}
|
||||
|
||||
public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) {
|
||||
this.hadoopAuthenticator = hadoopAuthenticator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
synchronized (clientPool) {
|
||||
@ -678,7 +684,11 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
}
|
||||
|
||||
private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) {
|
||||
return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action);
|
||||
try {
|
||||
return hadoopAuthenticator.doAs(action);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@ -56,7 +57,7 @@ public abstract class RemoteFileSystem extends PersistentFileSystem {
|
||||
try {
|
||||
org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath);
|
||||
Path locatedPath = new Path(remotePath);
|
||||
RemoteIterator<LocatedFileStatus> locatedFiles = fileSystem.listFiles(locatedPath, recursive);
|
||||
RemoteIterator<LocatedFileStatus> locatedFiles = getLocatedFiles(recursive, fileSystem, locatedPath);
|
||||
while (locatedFiles.hasNext()) {
|
||||
LocatedFileStatus fileStatus = locatedFiles.next();
|
||||
RemoteFile location = new RemoteFile(
|
||||
@ -72,11 +73,16 @@ public abstract class RemoteFileSystem extends PersistentFileSystem {
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive,
|
||||
FileSystem fileSystem, Path locatedPath) throws IOException {
|
||||
return fileSystem.listFiles(locatedPath, recursive);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status listDirectories(String remotePath, Set<String> result) {
|
||||
try {
|
||||
FileSystem fileSystem = nativeFileSystem(remotePath);
|
||||
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath));
|
||||
FileStatus[] fileStatuses = getFileStatuses(remotePath, fileSystem);
|
||||
result.addAll(
|
||||
Arrays.stream(fileStatuses)
|
||||
.filter(FileStatus::isDirectory)
|
||||
@ -88,6 +94,10 @@ public abstract class RemoteFileSystem extends PersistentFileSystem {
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException {
|
||||
return fileSystem.listStatus(new Path(remotePath));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status renameDir(String origFilePath,
|
||||
String destFilePath,
|
||||
|
||||
@ -21,7 +21,7 @@ import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.backup.Status;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.security.authentication.AuthenticationConfig;
|
||||
import org.apache.doris.common.security.authentication.HadoopUGI;
|
||||
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
|
||||
import org.apache.doris.common.util.URI;
|
||||
import org.apache.doris.fs.operations.HDFSFileOperations;
|
||||
import org.apache.doris.fs.operations.HDFSOpParams;
|
||||
@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -58,8 +60,8 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
|
||||
public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed";
|
||||
private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class);
|
||||
|
||||
private HDFSFileOperations operations = null;
|
||||
private HadoopAuthenticator authenticator = null;
|
||||
|
||||
public DFSFileSystem(Map<String, String> properties) {
|
||||
this(StorageBackend.StorageType.HDFS, properties);
|
||||
@ -80,21 +82,35 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
for (Map.Entry<String, String> propEntry : properties.entrySet()) {
|
||||
conf.set(propEntry.getKey(), propEntry.getValue());
|
||||
}
|
||||
|
||||
dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
|
||||
try {
|
||||
return FileSystem.get(new Path(remotePath).toUri(), conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf);
|
||||
authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig);
|
||||
try {
|
||||
dfsFileSystem = authenticator.doAs(() -> {
|
||||
try {
|
||||
return FileSystem.get(new Path(remotePath).toUri(), conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw new UserException(e);
|
||||
}
|
||||
operations = new HDFSFileOperations(dfsFileSystem);
|
||||
}
|
||||
}
|
||||
}
|
||||
operations = new HDFSFileOperations(dfsFileSystem);
|
||||
return dfsFileSystem;
|
||||
}
|
||||
|
||||
protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive,
|
||||
FileSystem fileSystem, Path locatedPath) throws IOException {
|
||||
return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, recursive));
|
||||
}
|
||||
|
||||
protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException {
|
||||
return authenticator.doAs(() -> fileSystem.listStatus(new Path(remotePath)));
|
||||
}
|
||||
|
||||
public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) {
|
||||
Configuration hdfsConf = new HdfsConfiguration();
|
||||
if (fallbackToSimpleAuth) {
|
||||
@ -266,7 +282,7 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
URI pathUri = URI.create(remotePath);
|
||||
Path inputFilePath = new Path(pathUri.getPath());
|
||||
FileSystem fileSystem = nativeFileSystem(remotePath);
|
||||
boolean isPathExist = fileSystem.exists(inputFilePath);
|
||||
boolean isPathExist = authenticator.doAs(() -> fileSystem.exists(inputFilePath));
|
||||
if (!isPathExist) {
|
||||
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
|
||||
}
|
||||
@ -381,7 +397,7 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
FileSystem fileSystem = nativeFileSystem(destPath);
|
||||
Path srcfilePath = new Path(srcPathUri.getPath());
|
||||
Path destfilePath = new Path(destPathUri.getPath());
|
||||
boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath);
|
||||
boolean isRenameSuccess = authenticator.doAs(() -> fileSystem.rename(srcfilePath, destfilePath));
|
||||
if (!isRenameSuccess) {
|
||||
return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath);
|
||||
}
|
||||
@ -402,7 +418,7 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
URI pathUri = URI.create(remotePath);
|
||||
Path inputFilePath = new Path(pathUri.getPath());
|
||||
FileSystem fileSystem = nativeFileSystem(remotePath);
|
||||
fileSystem.delete(inputFilePath, true);
|
||||
authenticator.doAs(() -> fileSystem.delete(inputFilePath, true));
|
||||
} catch (UserException e) {
|
||||
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
|
||||
} catch (IOException e) {
|
||||
@ -428,7 +444,7 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
URI pathUri = URI.create(remotePath);
|
||||
FileSystem fileSystem = nativeFileSystem(remotePath);
|
||||
Path pathPattern = new Path(pathUri.getPath());
|
||||
FileStatus[] files = fileSystem.globStatus(pathPattern);
|
||||
FileStatus[] files = authenticator.doAs(() -> fileSystem.globStatus(pathPattern));
|
||||
if (files == null) {
|
||||
LOG.info("no files in path " + remotePath);
|
||||
return Status.OK;
|
||||
@ -455,7 +471,7 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
public Status makeDir(String remotePath) {
|
||||
try {
|
||||
FileSystem fileSystem = nativeFileSystem(remotePath);
|
||||
if (!fileSystem.mkdirs(new Path(remotePath))) {
|
||||
if (!authenticator.doAs(() -> fileSystem.mkdirs(new Path(remotePath)))) {
|
||||
LOG.warn("failed to make dir for " + remotePath);
|
||||
return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user