[improvement](kerberos) disable hdfs fs handle cache to renew kerberos ticket at fix interval (#21265)

Add a new BE config `kerberos_ticket_lifetime_seconds`, default is 86400.
Better set it same as the value of `ticket_lifetime` in `krb5.conf`
If a HDFS fs handle in cache is live longer than HALF of this time, it will be set as invalid and recreated.
And the kerberos ticket will be renewed.
This commit is contained in:
Mingyu Chen
2023-07-04 17:13:34 +08:00
committed by GitHub
parent c2b483529c
commit 13fb69550a
5 changed files with 54 additions and 27 deletions

View File

@ -1028,6 +1028,7 @@ DEFINE_Int64(max_external_file_meta_cache_num, "20000");
DEFINE_Int32(rocksdb_max_write_buffer_number, "5");
DEFINE_Bool(allow_invalid_decimalv2_literal, "false");
DEFINE_mInt64(kerberos_expiration_time_seconds, "43200");
#ifdef BE_TEST
// test s3

View File

@ -1045,6 +1045,10 @@ DECLARE_Int32(rocksdb_max_write_buffer_number);
// Allow invalid decimalv2 literal for compatible with old version. Recommend set it false strongly.
DECLARE_mBool(allow_invalid_decimalv2_literal);
// the max expiration time of kerberos ticket.
// If a hdfs filesytem with kerberos authentication live longer
// than this time, it will be expired.
DECLARE_mInt64(kerberos_expiration_time_seconds);
#ifdef BE_TEST
// test s3

View File

@ -78,7 +78,7 @@ private:
HdfsFileSystemCache() = default;
uint64 _hdfs_hash_code(const THdfsParams& hdfs_params);
Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs);
Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, bool* is_kerberos);
void _clean_invalid();
void _clean_oldest();
};
@ -423,9 +423,11 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
// ************* HdfsFileSystemCache ******************
int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs) {
Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs,
bool* is_kerberos) {
HDFSCommonBuilder builder;
RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder));
*is_kerberos = builder.is_need_kinit();
hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
if (hdfs_fs == nullptr) {
return Status::IOError("faield to connect to hdfs {}: {}", hdfs_params.fs_name,
@ -467,30 +469,33 @@ Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params,
auto it = _cache.find(hash_code);
if (it != _cache.end()) {
HdfsFileSystemHandle* handle = it->second.get();
if (handle->invalid()) {
hdfsFS hdfs_fs = nullptr;
RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
} else {
if (!handle->invalid()) {
handle->inc_ref();
*fs_handle = handle;
return Status::OK();
}
// fs handle is invalid, erase it.
_cache.erase(it);
LOG(INFO) << "erase the hdfs handle, fs name: " << hdfs_params.fs_name;
}
// not find in cache, or fs handle is invalid
// create a new one and try to put it into cache
hdfsFS hdfs_fs = nullptr;
bool is_kerberos = false;
RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs, &is_kerberos));
if (_cache.size() >= MAX_CACHE_HANDLE) {
_clean_invalid();
_clean_oldest();
}
if (_cache.size() < MAX_CACHE_HANDLE) {
std::unique_ptr<HdfsFileSystemHandle> handle =
std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true, is_kerberos);
handle->inc_ref();
*fs_handle = handle.get();
_cache[hash_code] = std::move(handle);
} else {
hdfsFS hdfs_fs = nullptr;
RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
if (_cache.size() >= MAX_CACHE_HANDLE) {
_clean_invalid();
_clean_oldest();
}
if (_cache.size() < MAX_CACHE_HANDLE) {
std::unique_ptr<HdfsFileSystemHandle> handle =
std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true);
handle->inc_ref();
*fs_handle = handle.get();
_cache[hash_code] = std::move(handle);
} else {
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
}
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false, is_kerberos);
}
}
return Status::OK();

View File

@ -27,6 +27,7 @@
#include <string>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/hdfs.h"
@ -41,14 +42,21 @@ struct FileInfo;
class HdfsFileSystemHandle {
public:
HdfsFileSystemHandle(hdfsFS fs, bool cached)
: hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {}
HdfsFileSystemHandle(hdfsFS fs, bool cached, bool is_kerberos)
: hdfs_fs(fs),
from_cache(cached),
_is_kerberos(is_kerberos),
_ref_cnt(0),
_create_time(_now()),
_last_access_time(0),
_invalid(false) {}
~HdfsFileSystemHandle() {
DCHECK(_ref_cnt == 0);
if (hdfs_fs != nullptr) {
// Even if there is an error, the resources associated with the hdfsFS will be freed.
hdfsDisconnect(hdfs_fs);
// DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed"
// even if we create a new one
// hdfsDisconnect(hdfs_fs);
}
hdfs_fs = nullptr;
}
@ -67,7 +75,11 @@ public:
int ref_cnt() { return _ref_cnt; }
bool invalid() { return _invalid; }
bool invalid() {
return _invalid ||
(_is_kerberos &&
_now() - _create_time.load() > config::kerberos_expiration_time_seconds * 1000);
}
void set_invalid() { _invalid = true; }
@ -77,8 +89,12 @@ public:
const bool from_cache;
private:
const bool _is_kerberos;
// the number of referenced client
std::atomic<int> _ref_cnt;
// For kerberos authentication, we need to save create time so that
// we can know if the kerberos ticket is expired.
std::atomic<uint64_t> _create_time;
// HdfsFileSystemCache try to remove the oldest handler when the cache is full
std::atomic<uint64_t> _last_access_time;
// Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler

View File

@ -72,6 +72,7 @@ Status HDFSCommonBuilder::run_kinit() {
#endif
hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path",
ticket_path.c_str());
LOG(INFO) << "finished to run kinit command: " << fmt::to_string(kinit_command);
return Status::OK();
}