[fix](paimon) fix hadoop.username does not take effect in paimon catalog (#31478)
This commit is contained in:
@ -71,7 +71,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
|
||||
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
|
||||
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
|
||||
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
@ -79,7 +78,6 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import shade.doris.hive.org.apache.thrift.TException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
@ -828,39 +826,14 @@ public class HiveMetaStoreClientHelper {
|
||||
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
|
||||
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
|
||||
}
|
||||
UserGroupInformation ugi = HadoopUGI.loginWithUGI(krbConfig);
|
||||
try {
|
||||
if (ugi != null) {
|
||||
ugi.checkTGTAndReloginFromKeytab();
|
||||
return ugi.doAs(action);
|
||||
} else {
|
||||
return action.run();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
return HadoopUGI.ugiDoAs(krbConfig, action);
|
||||
}
|
||||
|
||||
public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
|
||||
String hudiBasePath = table.getRemoteTable().getSd().getLocation();
|
||||
|
||||
Configuration conf = getConfiguration(table);
|
||||
UserGroupInformation ugi = HadoopUGI.loginWithUGI(AuthenticationConfig.getKerberosConfig(conf));
|
||||
HoodieTableMetaClient metaClient;
|
||||
if (ugi != null) {
|
||||
try {
|
||||
metaClient = ugi.doAs(
|
||||
(PrivilegedExceptionAction<HoodieTableMetaClient>) () -> HoodieTableMetaClient.builder()
|
||||
.setConf(conf).setBasePath(hudiBasePath).build());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Cannot get hudi client.", e);
|
||||
}
|
||||
} else {
|
||||
metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build();
|
||||
}
|
||||
return metaClient;
|
||||
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
|
||||
() -> HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build());
|
||||
}
|
||||
|
||||
public static Configuration getConfiguration(HMSExternalTable table) {
|
||||
|
||||
@ -18,6 +18,8 @@
|
||||
package org.apache.doris.datasource.paimon;
|
||||
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.security.authentication.AuthenticationConfig;
|
||||
import org.apache.doris.common.security.authentication.HadoopUGI;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.InitCatalogLog;
|
||||
import org.apache.doris.datasource.SessionContext;
|
||||
@ -25,12 +27,15 @@ import org.apache.doris.datasource.property.constants.PaimonProperties;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.paimon.catalog.Catalog;
|
||||
import org.apache.paimon.catalog.CatalogContext;
|
||||
import org.apache.paimon.catalog.CatalogFactory;
|
||||
import org.apache.paimon.catalog.FileSystemCatalog;
|
||||
import org.apache.paimon.catalog.Identifier;
|
||||
import org.apache.paimon.hive.HiveCatalog;
|
||||
import org.apache.paimon.options.Options;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -44,6 +49,7 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
|
||||
public static final String PAIMON_HMS = "hms";
|
||||
protected String catalogType;
|
||||
protected Catalog catalog;
|
||||
protected AuthenticationConfig authConf;
|
||||
|
||||
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
|
||||
PaimonProperties.WAREHOUSE
|
||||
@ -54,13 +60,20 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init() {
|
||||
super.init();
|
||||
}
|
||||
|
||||
public Catalog getCatalog() {
|
||||
makeSureInitialized();
|
||||
return catalog;
|
||||
protected void initLocalObjectsImpl() {
|
||||
Configuration conf = new Configuration();
|
||||
for (Map.Entry<String, String> propEntry : this.catalogProperty.getHadoopProperties().entrySet()) {
|
||||
conf.set(propEntry.getKey(), propEntry.getValue());
|
||||
}
|
||||
if (catalog instanceof FileSystemCatalog) {
|
||||
authConf = AuthenticationConfig.getKerberosConfig(conf,
|
||||
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
|
||||
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
|
||||
} else if (catalog instanceof HiveCatalog) {
|
||||
authConf = AuthenticationConfig.getKerberosConfig(conf,
|
||||
AuthenticationConfig.HIVE_KERBEROS_PRINCIPAL,
|
||||
AuthenticationConfig.HIVE_KERBEROS_KEYTAB);
|
||||
}
|
||||
}
|
||||
|
||||
public String getCatalogType() {
|
||||
@ -69,36 +82,40 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
|
||||
protected List<String> listDatabaseNames() {
|
||||
return new ArrayList<>(catalog.listDatabases());
|
||||
return HadoopUGI.ugiDoAs(authConf, () -> new ArrayList<>(catalog.listDatabases()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
|
||||
makeSureInitialized();
|
||||
return catalog.tableExists(Identifier.create(dbName, tblName));
|
||||
return HadoopUGI.ugiDoAs(authConf, () -> catalog.tableExists(Identifier.create(dbName, tblName)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listTableNames(SessionContext ctx, String dbName) {
|
||||
makeSureInitialized();
|
||||
List<String> tableNames = null;
|
||||
try {
|
||||
tableNames = catalog.listTables(dbName);
|
||||
} catch (Catalog.DatabaseNotExistException e) {
|
||||
LOG.warn("DatabaseNotExistException", e);
|
||||
}
|
||||
return tableNames;
|
||||
return HadoopUGI.ugiDoAs(authConf, () -> {
|
||||
List<String> tableNames = null;
|
||||
try {
|
||||
tableNames = catalog.listTables(dbName);
|
||||
} catch (Catalog.DatabaseNotExistException e) {
|
||||
LOG.warn("DatabaseNotExistException", e);
|
||||
}
|
||||
return tableNames;
|
||||
});
|
||||
}
|
||||
|
||||
public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName) {
|
||||
makeSureInitialized();
|
||||
org.apache.paimon.table.Table table = null;
|
||||
try {
|
||||
table = catalog.getTable(Identifier.create(dbName, tblName));
|
||||
} catch (Catalog.TableNotExistException e) {
|
||||
LOG.warn("TableNotExistException", e);
|
||||
}
|
||||
return table;
|
||||
return HadoopUGI.ugiDoAs(authConf, () -> {
|
||||
org.apache.paimon.table.Table table = null;
|
||||
try {
|
||||
table = catalog.getTable(Identifier.create(dbName, tblName));
|
||||
} catch (Catalog.TableNotExistException e) {
|
||||
LOG.warn("TableNotExistException", e);
|
||||
}
|
||||
return table;
|
||||
});
|
||||
}
|
||||
|
||||
protected String getPaimonCatalogType(String catalogType) {
|
||||
|
||||
@ -42,6 +42,7 @@ public class PaimonFileExternalCatalog extends PaimonExternalCatalog {
|
||||
protected void initLocalObjectsImpl() {
|
||||
catalogType = PAIMON_FILESYSTEM;
|
||||
catalog = createCatalog();
|
||||
super.initLocalObjectsImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -51,7 +50,6 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.file.FileVisitOption;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -82,18 +80,13 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
conf.set(propEntry.getKey(), propEntry.getValue());
|
||||
}
|
||||
|
||||
UserGroupInformation ugi = HadoopUGI.loginWithUGI(AuthenticationConfig.getKerberosConfig(conf));
|
||||
try {
|
||||
dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
|
||||
try {
|
||||
return FileSystem.get(new Path(remotePath).toUri(), conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (SecurityException e) {
|
||||
throw new UserException(e);
|
||||
}
|
||||
dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
|
||||
try {
|
||||
return FileSystem.get(new Path(remotePath).toUri(), conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
Preconditions.checkNotNull(dfsFileSystem);
|
||||
operations = new HDFSFileOperations(dfsFileSystem);
|
||||
|
||||
@ -474,8 +474,8 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
|
||||
if (getTFileType() == TFileType.FILE_HDFS) {
|
||||
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
|
||||
String fsNmae = getLocationProperties().get(HdfsResource.HADOOP_FS_NAME);
|
||||
tHdfsParams.setFsName(fsNmae);
|
||||
String fsName = getLocationProperties().get(HdfsResource.HADOOP_FS_NAME);
|
||||
tHdfsParams.setFsName(fsName);
|
||||
fileScanRangeParams.setHdfsParams(tHdfsParams);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user