1. Fix create catalog with resource replay bug. If user create catalog using `create catalog hive with resource xxx`, when replaying edit log, there is a bug that resource may be dropped, causing NPE and FE will fail to start. In this PR, I add a new FE config `disallow_create_catalog_with_resource`, default is true. So that `with resource` will not be allowed, and it will be deprecated later. And also fix the replay bug to avoid NPE. 2. Fix issue when creating 2 hive catalogs to connect with and without kerberos authentication. When user create 2 hive catalogs, one use simple auth, the other use kerberos auth. The query may fail with error like: `Server asks us to fall back to SIMPLE auth, but this client is configured to only allow secure connections.` So I add a default property for hive catalog: `"ipc.client.fallback-to-simple-auth-allowed" = "true"`. Which means this property will be added automatically when user creating hive catalog, to avoid such problem. 3. Fix calling `hdfsExists()` issue When calling `hdfsExists()` with non-zero return code, should check if it encounters error or is file not found. 3. Some code refactor Avoid import `org.apache.parquet.Strings`
161 lines
6.2 KiB
C++
161 lines
6.2 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "io/hdfs_builder.h"
|
|
|
|
#include <fmt/format.h>
|
|
#include <gen_cpp/PlanNodes_types.h>
|
|
|
|
#include <cstdlib>
|
|
#include <fstream>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "agent/utils.h"
|
|
#include "common/logging.h"
|
|
#include "io/fs/hdfs.h"
|
|
#include "util/string_util.h"
|
|
#include "util/uid_util.h"
|
|
|
|
namespace doris {
|
|
|
|
Status HDFSCommonBuilder::init_hdfs_builder() {
|
|
hdfs_builder = hdfsNewBuilder();
|
|
if (hdfs_builder == nullptr) {
|
|
LOG(INFO) << "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml";
|
|
return Status::InternalError(
|
|
"failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml");
|
|
}
|
|
hdfsBuilderSetForceNewInstance(hdfs_builder);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status HDFSCommonBuilder::run_kinit() {
|
|
if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
|
|
return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab");
|
|
}
|
|
std::string ticket_path = TICKET_CACHE_PATH + generate_uuid_string();
|
|
const char* krb_home = getenv("KRB_HOME");
|
|
std::string krb_home_str(krb_home ? krb_home : "");
|
|
fmt::memory_buffer kinit_command;
|
|
if (krb_home_str.empty()) {
|
|
fmt::format_to(kinit_command, "kinit -c {} -R -t {} -k {}", ticket_path,
|
|
hdfs_kerberos_keytab, hdfs_kerberos_principal);
|
|
} else {
|
|
// Assign kerberos home in env, get kinit in kerberos home
|
|
fmt::format_to(kinit_command, krb_home_str + "/bin/kinit -c {} -R -t {} -k {}", ticket_path,
|
|
hdfs_kerberos_keytab, hdfs_kerberos_principal);
|
|
}
|
|
VLOG_NOTICE << "kinit command: " << fmt::to_string(kinit_command);
|
|
std::string msg;
|
|
AgentUtils util;
|
|
bool rc = util.exec_cmd(fmt::to_string(kinit_command), &msg);
|
|
if (!rc) {
|
|
return Status::InternalError("Kinit failed, errMsg: " + msg);
|
|
}
|
|
#ifdef USE_LIBHDFS3
|
|
hdfsBuilderSetPrincipal(hdfs_builder, hdfs_kerberos_principal.c_str());
|
|
#endif
|
|
hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path",
|
|
ticket_path.c_str());
|
|
return Status::OK();
|
|
}
|
|
|
|
THdfsParams parse_properties(const std::map<std::string, std::string>& properties) {
|
|
StringCaseMap<std::string> prop(properties.begin(), properties.end());
|
|
std::vector<THdfsConf> hdfs_configs;
|
|
THdfsParams hdfsParams;
|
|
for (auto iter = prop.begin(); iter != prop.end();) {
|
|
if (iter->first.compare(FS_KEY) == 0) {
|
|
hdfsParams.__set_fs_name(iter->second);
|
|
iter = prop.erase(iter);
|
|
} else if (iter->first.compare(USER) == 0) {
|
|
hdfsParams.__set_user(iter->second);
|
|
iter = prop.erase(iter);
|
|
} else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
|
|
hdfsParams.__set_hdfs_kerberos_principal(iter->second);
|
|
iter = prop.erase(iter);
|
|
} else if (iter->first.compare(KERBEROS_KEYTAB) == 0) {
|
|
hdfsParams.__set_hdfs_kerberos_keytab(iter->second);
|
|
iter = prop.erase(iter);
|
|
} else {
|
|
THdfsConf item;
|
|
item.key = iter->first;
|
|
item.value = iter->second;
|
|
hdfs_configs.push_back(item);
|
|
iter = prop.erase(iter);
|
|
}
|
|
}
|
|
if (!hdfsParams.__isset.user && std::getenv("HADOOP_USER_NAME") != nullptr) {
|
|
hdfsParams.__set_user(std::getenv("HADOOP_USER_NAME"));
|
|
}
|
|
hdfsParams.__set_hdfs_conf(hdfs_configs);
|
|
return hdfsParams;
|
|
}
|
|
|
|
Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* builder) {
|
|
RETURN_IF_ERROR(builder->init_hdfs_builder());
|
|
hdfsBuilderSetNameNode(builder->get(), hdfsParams.fs_name.c_str());
|
|
// set kerberos conf
|
|
if (hdfsParams.__isset.hdfs_kerberos_principal) {
|
|
builder->need_kinit = true;
|
|
builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
|
|
hdfsBuilderSetUserName(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str());
|
|
} else if (hdfsParams.__isset.user) {
|
|
hdfsBuilderSetUserName(builder->get(), hdfsParams.user.c_str());
|
|
#ifdef USE_HADOOP_HDFS
|
|
hdfsBuilderSetKerb5Conf(builder->get(), nullptr);
|
|
hdfsBuilderSetKeyTabFile(builder->get(), nullptr);
|
|
#endif
|
|
}
|
|
if (hdfsParams.__isset.hdfs_kerberos_keytab) {
|
|
builder->need_kinit = true;
|
|
builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
|
|
#ifdef USE_HADOOP_HDFS
|
|
hdfsBuilderSetKeyTabFile(builder->get(), hdfsParams.hdfs_kerberos_keytab.c_str());
|
|
#endif
|
|
}
|
|
// set other conf
|
|
if (hdfsParams.__isset.hdfs_conf) {
|
|
for (const THdfsConf& conf : hdfsParams.hdfs_conf) {
|
|
hdfsBuilderConfSetStr(builder->get(), conf.key.c_str(), conf.value.c_str());
|
|
#ifdef USE_HADOOP_HDFS
|
|
// Set krb5.conf, we should define java.security.krb5.conf in catalog properties
|
|
if (strcmp(conf.key.c_str(), "java.security.krb5.conf") == 0) {
|
|
hdfsBuilderSetKerb5Conf(builder->get(), conf.value.c_str());
|
|
}
|
|
#endif
|
|
}
|
|
}
|
|
|
|
hdfsBuilderConfSetStr(builder->get(), "ipc.client.fallback-to-simple-auth-allowed", "true");
|
|
|
|
if (builder->is_need_kinit()) {
|
|
RETURN_IF_ERROR(builder->run_kinit());
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status createHDFSBuilder(const std::map<std::string, std::string>& properties,
|
|
HDFSCommonBuilder* builder) {
|
|
THdfsParams hdfsParams = parse_properties(properties);
|
|
return createHDFSBuilder(hdfsParams, builder);
|
|
}
|
|
|
|
} // namespace doris
|