[Fix](kerberos) Fix kerberos relogin bugs when using hdfs-load. (#24490)
This commit is contained in:
@ -82,49 +82,47 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
conf.set(propEntry.getKey(), propEntry.getValue());
|
||||
}
|
||||
|
||||
boolean hasRelogin = false;
|
||||
UserGroupInformation ugi;
|
||||
UserGroupInformation ugi = login(conf);
|
||||
try {
|
||||
// try use current ugi first to avoid relogin
|
||||
// because it may be a time-consuming task
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("An IOException occurs when invoke "
|
||||
+ "UserGroupInformation.getCurrentUser(), relogin immediately.", e);
|
||||
ugi = doLogin(conf);
|
||||
hasRelogin = true;
|
||||
}
|
||||
|
||||
do {
|
||||
try {
|
||||
dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
|
||||
try {
|
||||
String username = properties.get(HdfsResource.HADOOP_USER_NAME);
|
||||
return username == null
|
||||
? FileSystem.get(new Path(remotePath).toUri(), conf)
|
||||
: FileSystem.get(new Path(remotePath).toUri(), conf, username);
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
LOG.debug("Reuse current ugi for dfs, remote path: {}", remotePath);
|
||||
break;
|
||||
} catch (SecurityException e) {
|
||||
LOG.warn("A SecurityException occurs when invoke ugi.doAs(), "
|
||||
+ "relogin and retry immediately.", e);
|
||||
if (hasRelogin) {
|
||||
throw new UserException(e);
|
||||
dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
|
||||
try {
|
||||
return FileSystem.get(new Path(remotePath).toUri(), conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
ugi = doLogin(conf);
|
||||
hasRelogin = true;
|
||||
}
|
||||
} while (true);
|
||||
});
|
||||
} catch (SecurityException e) {
|
||||
throw new UserException(e);
|
||||
}
|
||||
|
||||
Preconditions.checkNotNull(dfsFileSystem);
|
||||
operations = new HDFSFileOperations(dfsFileSystem);
|
||||
return dfsFileSystem;
|
||||
}
|
||||
|
||||
private UserGroupInformation login(Configuration conf) throws UserException {
|
||||
if (AuthType.KERBEROS.getDesc().equals(
|
||||
conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null))) {
|
||||
try {
|
||||
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
||||
String principal = conf.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL);
|
||||
LOG.debug("Current login user: {}", ugi.getUserName());
|
||||
if (ugi.hasKerberosCredentials() && ugi.getUserName().equals(principal)) {
|
||||
// if the current user is logged by kerberos and is the same user
|
||||
// just use checkTGTAndReloginFromKeytab because this method will only relogin
|
||||
// when the TGT is expired or is close to expiry
|
||||
ugi.checkTGTAndReloginFromKeytab();
|
||||
return ugi;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e);
|
||||
return doLogin(conf);
|
||||
}
|
||||
}
|
||||
|
||||
return doLogin(conf);
|
||||
}
|
||||
|
||||
private UserGroupInformation doLogin(Configuration conf) throws UserException {
|
||||
if (AuthType.KERBEROS.getDesc().equals(
|
||||
conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null))) {
|
||||
|
||||
Reference in New Issue
Block a user